<@ULVA73B9P> From within a flow process (deployed ...
# ask-marvin
s
@Marvin From within a flow process (deployed with "type=process"), can I mix calls of tasks using
submit
to stay inside the flow process, and then inside that task, schedule a background task with
delay
? When I do so, it fails during task deserialization with the following error:
Copy code
[error    ] Failed to read parameters for task run UUID('6f8cc87e-ecd3-4e43-b750-42a6602bfcde') [prefect.task_worker]
Traceback (most recent call last):
  File "/app/venv/lib/python3.11/site-packages/prefect/task_worker.py", line 322, in _submit_scheduled_task_run
    run_data: dict[str, Any] = await read_parameters(store, parameters_id)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.11/site-packages/prefect/task_worker.py", line 567, in read_parameters
    record: ResultRecord[Any] = ResultRecord[Any].deserialize(
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.11/site-packages/prefect/_result_records.py", line 212, in deserialize
    instance.result = instance.serializer.loads(instance.result.encode())
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.11/site-packages/prefect/serializers.py", line 163, in loads
    return pickler.loads(base64.decodebytes(blob))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'parsing'
m
thought for 469.9 seconds
Short answer: Yes, you can mix in‑process
submit
with background
delay
— even calling
.delay()
from inside a task submitted with
submit
. The error you’re seeing is almost certainly an import/packaging issue on the task worker at parameter deserialization time. What’s happening -
submit
runs the task inside your flow run’s process (type=process), so it has your code on sys.path. -
delay
schedules a background task that will be executed by a task worker. When the worker pulls the task, it reads the stored parameters and deserializes them (the stack trace shows Prefect’s Pickle serializer). - ModuleNotFoundError: No module named 'parsing' means the worker cannot import a module referenced by your task parameters (or by the task function’s closure), e.g. a local project module named
parsing
. Why this bites only with delay - In-process
submit
does not cross process/machine boundaries and therefore sees your code. - Background tasks run in a separate process (a task worker), which must be able to import: - the task function itself - any custom classes/types used in the task’s parameters/results - any modules referenced by those objects - If the worker environment doesn’t have your project code (or third-party deps), unpickling fails with ModuleNotFoundError. How to fix - Ensure the task worker can import your project code and dependencies: - Use the same Docker image/venv for both your flow runner and your task workers. - Package your project (e.g.,
pip install -e .
) into the environment where the task worker runs. - If you deploy flows from source, ensure task workers also run with that source available/importable. - Start a task worker that imports your tasks: - If you’re self-hosting task workers, run:
Copy code
prefect task serve ./path/to/tasks.py:your_task ./path/to/other_tasks.py:another_task
or
Copy code
prefect task serve -m your_project.tasks
- Keep this (or multiple) workers running wherever you want the background tasks executed. - Prefer passing simple, JSON-serializable primitives to background tasks. If you pass custom objects (e.g.,
MyModel
from
parsing.py
), the worker must be able to import
parsing.MyModel
. Otherwise, convert to plain dicts/lists before calling
.delay(...)
. Notes for your setup - Dynamically naming tasks via
.with_options(name=...)
before
.delay()
is fine and not related to this error. - Since you already gather results with
.result_async()
via
asyncio.gather
, the above is compatible — just make sure a task worker is up and can import your code. Helpful references - Background tasks overview: Background tasks (Prefect 3) - CLI for serving task workers: prefect task serve docs If you can share: - how you’re running the task worker (command, image/env), and - where the
parsing
module lives, and - the argument types you pass to the delayed task, I can suggest the quickest packaging path (e.g., specific Dockerfile or command tweaks).
s
@Marvin I'm currently using a single container starting 2 threads: that "serve" deployments of flows and another one which serves the background tasks. Why and how the module can be not found ?
m
thought for 80.1 seconds
Even in one container, your “flow runner” process and your “task worker” process can see different Python import paths and working directories. When your flow pickles parameters for a delayed task, the pickle stores the module path for any non-JSON objects it contains. The task worker must be able to import that exact module path to unpickle them. If it can’t, you get ModuleNotFoundError (here:
parsing
). Common reasons this happens (and how to fix): - Different import name between processes - Example: the flow imports
parsing
as a top-level module (because CWD is the project root), so the pickle records
parsing.*
. The task worker can only import it as
myapp.parsing
(or vice versa), so
parsing
isn’t found. - Fix: install your project as a package in the image and always import with the same package-qualified name (e.g.,
myapp.parsing
). Avoid relying on CWD for top-level imports. - Different working directory or PYTHONPATH - Your two long-lived commands may start from different CWDs, so
''
in
sys.path
points to different locations. - Fix: set a consistent
WORKDIR
and ensure both commands run from it, or set
PYTHONPATH
to your project root. - Code available to flow runs but not to the task worker - If your flow is deployed “from source”, Prefect fetches the repo for the flow runner at runtime into an ephemeral dir. The task worker does not automatically get that code. - Fix: make your code available to the task worker too (bake it into the image, mount the repo, or clone on startup) and ensure it’s importable under the same name as when the flow pickled the params. - Non-JSON parameters force Pickle - If you pass custom classes (e.g., objects from
parsing.py
) to
.delay()
, Prefect will pickle them. Unpickling requires the same import path to exist on the worker. - Fix: pass only JSON-serializable primitives to
.delay()
(convert objects to dicts/lists), or ensure the module is importable identically in both processes. What to check quickly - Confirm both processes see the same paths: - Log
os.getcwd()
and
sys.path
from a task run and from a (minimal) delayed task. - Ensure consistent imports: - Don’t
import parsing
as a top-level module in one place and
from myapp import parsing
in another. Pick one fully-qualified path and stick with it. - Make sure your task worker actually imports your tasks and their dependencies: - Start it with something like:
Copy code
prefect task serve -m myapp.tasks
and ensure
myapp
is installed in the container. - If you’re using from_source for flows, also provide the same source to the task worker (image bake, volume, or clone) — it won’t inherit the flow runner’s ephemeral checkout. If you can share: - how you launch each thread/command (the exact commands), - where
parsing.py
lives and how it’s imported in your code, - and a sample of what you pass to
.delay()
, I can point to the minimal change (package name/import fix vs. Dockerfile tweak vs. parameter shape) to make this work. References - Background tasks: Background tasks (Prefect 3) - CLI for task workers: prefect task serve