Syméon del Marmol
09/19/2025, 1:38 PMsubmit
to stay inside the flow process, and then inside that task, schedule a background task with delay
?
When I do so, it fails during task deserialization with the following error:
[error ] Failed to read parameters for task run UUID('6f8cc87e-ecd3-4e43-b750-42a6602bfcde') [prefect.task_worker]
Traceback (most recent call last):
File "/app/venv/lib/python3.11/site-packages/prefect/task_worker.py", line 322, in _submit_scheduled_task_run
run_data: dict[str, Any] = await read_parameters(store, parameters_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/venv/lib/python3.11/site-packages/prefect/task_worker.py", line 567, in read_parameters
record: ResultRecord[Any] = ResultRecord[Any].deserialize(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/venv/lib/python3.11/site-packages/prefect/_result_records.py", line 212, in deserialize
instance.result = instance.serializer.loads(instance.result.encode())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/venv/lib/python3.11/site-packages/prefect/serializers.py", line 163, in loads
return pickler.loads(base64.decodebytes(blob))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'parsing'
Marvin
09/19/2025, 1:38 PMMarvin
09/19/2025, 1:46 PMsubmit
with background delay
— even calling .delay()
from inside a task submitted with submit
. The error you’re seeing is almost certainly an import/packaging issue on the task worker at parameter deserialization time.
What’s happening
- submit
runs the task inside your flow run’s process (type=process), so it has your code on sys.path.
- delay
schedules a background task that will be executed by a task worker. When the worker pulls the task, it reads the stored parameters and deserializes them (the stack trace shows Prefect’s Pickle serializer).
- ModuleNotFoundError: No module named 'parsing' means the worker cannot import a module referenced by your task parameters (or by the task function’s closure), e.g. a local project module named parsing
.
Why this bites only with delay
- In-process submit
does not cross process/machine boundaries and therefore sees your code.
- Background tasks run in a separate process (a task worker), which must be able to import:
- the task function itself
- any custom classes/types used in the task’s parameters/results
- any modules referenced by those objects
- If the worker environment doesn’t have your project code (or third-party deps), unpickling fails with ModuleNotFoundError.
How to fix
- Ensure the task worker can import your project code and dependencies:
- Use the same Docker image/venv for both your flow runner and your task workers.
- Package your project (e.g., pip install -e .
) into the environment where the task worker runs.
- If you deploy flows from source, ensure task workers also run with that source available/importable.
- Start a task worker that imports your tasks:
- If you’re self-hosting task workers, run:
prefect task serve ./path/to/tasks.py:your_task ./path/to/other_tasks.py:another_task
or
prefect task serve -m your_project.tasks
- Keep this (or multiple) workers running wherever you want the background tasks executed.
- Prefer passing simple, JSON-serializable primitives to background tasks. If you pass custom objects (e.g., MyModel
from parsing.py
), the worker must be able to import parsing.MyModel
. Otherwise, convert to plain dicts/lists before calling .delay(...)
.
Notes for your setup
- Dynamically naming tasks via .with_options(name=...)
before .delay()
is fine and not related to this error.
- Since you already gather results with .result_async()
via asyncio.gather
, the above is compatible — just make sure a task worker is up and can import your code.
Helpful references
- Background tasks overview: Background tasks (Prefect 3)
- CLI for serving task workers: prefect task serve docs
If you can share:
- how you’re running the task worker (command, image/env), and
- where the parsing
module lives, and
- the argument types you pass to the delayed task,
I can suggest the quickest packaging path (e.g., specific Dockerfile or command tweaks).Syméon del Marmol
09/19/2025, 4:18 PMMarvin
09/19/2025, 4:18 PMMarvin
09/19/2025, 4:19 PMparsing
).
Common reasons this happens (and how to fix):
- Different import name between processes
- Example: the flow imports parsing
as a top-level module (because CWD is the project root), so the pickle records parsing.*
. The task worker can only import it as myapp.parsing
(or vice versa), so parsing
isn’t found.
- Fix: install your project as a package in the image and always import with the same package-qualified name (e.g., myapp.parsing
). Avoid relying on CWD for top-level imports.
- Different working directory or PYTHONPATH
- Your two long-lived commands may start from different CWDs, so ''
in sys.path
points to different locations.
- Fix: set a consistent WORKDIR
and ensure both commands run from it, or set PYTHONPATH
to your project root.
- Code available to flow runs but not to the task worker
- If your flow is deployed “from source”, Prefect fetches the repo for the flow runner at runtime into an ephemeral dir. The task worker does not automatically get that code.
- Fix: make your code available to the task worker too (bake it into the image, mount the repo, or clone on startup) and ensure it’s importable under the same name as when the flow pickled the params.
- Non-JSON parameters force Pickle
- If you pass custom classes (e.g., objects from parsing.py
) to .delay()
, Prefect will pickle them. Unpickling requires the same import path to exist on the worker.
- Fix: pass only JSON-serializable primitives to .delay()
(convert objects to dicts/lists), or ensure the module is importable identically in both processes.
What to check quickly
- Confirm both processes see the same paths:
- Log os.getcwd()
and sys.path
from a task run and from a (minimal) delayed task.
- Ensure consistent imports:
- Don’t import parsing
as a top-level module in one place and from myapp import parsing
in another. Pick one fully-qualified path and stick with it.
- Make sure your task worker actually imports your tasks and their dependencies:
- Start it with something like:
prefect task serve -m myapp.tasks
and ensure myapp
is installed in the container.
- If you’re using from_source for flows, also provide the same source to the task worker (image bake, volume, or clone) — it won’t inherit the flow runner’s ephemeral checkout.
If you can share:
- how you launch each thread/command (the exact commands),
- where parsing.py
lives and how it’s imported in your code,
- and a sample of what you pass to .delay()
,
I can point to the minimal change (package name/import fix vs. Dockerfile tweak vs. parameter shape) to make this work.
References
- Background tasks: Background tasks (Prefect 3)
- CLI for task workers: prefect task serve