Syméon del Marmol
09/04/2025, 11:13 AMtask.delay()
within another task or from a flow called by a background task, the serialization fails with the following error:
Task run encountered an exception TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
Here is a very simple example that reproduce the issue when calling "dummy_task":
@task
def dummy_task(input: str):
print(f"Running dummy task with input: {input}")
time.sleep(1)
return f"processed: {input}"
@task
def background_task(input: str):
print(f"Running background task with input: {input}")
sub_task = dummy_task.delay(input)
return sub_task.result()
if __name__ == "__main__":
# This serves both tasks (for testing)
serve(background_task, dummy_task, limit=10)
And the task initiator executes:
task = background_task.delay("test")
result = task.result()
print(result)
What is wrong with this code ? What are the other ways to distribute the load among multiple workers when calling tasks from a background task ?Marvin
09/04/2025, 11:13 AMMarvin
09/04/2025, 11:23 AMtask.delay()
from inside a running task), or when a flow is executed from a background task and then calls .delay()
. The error you posted matches what we’ve seen elsewhere:
- Launching a task from a task causes 'MockValSer' serialization error
- Related context for the same Pydantic error in other nested/edge cases: serialization on cancel and flow-run cleanup/cancel
What’s “wrong” with the code
- Nothing logically — it’s hitting the nested-background-task serialization bug.
- In Prefect 3.x today, background task submissions should be initiated from a flow (not from another task). Submitting background tasks from inside tasks is not reliable due to the serialization of the run context.
Recommended patterns that work today
1) Move background orchestration into a flow
- Have a flow submit background tasks via .delay()
and await with .result_async()
(as you’re already doing in other places).
- Serve only the leaf tasks with the Task Worker.
Working version of your example:
from prefect import flow, task
from prefect.task_worker import serve
import time
import asyncio
@task
def dummy_task(input: str):
print(f"Running dummy task with input: {input}")
time.sleep(1)
return f"processed: {input}"
@flow
async def background_flow(input: str):
sub_future = dummy_task.with_options(name=f"dummy-{input}").delay(input)
result = await sub_future.result_async()
return result
if __name__ == "__main__":
# Serve only the task(s)
serve(dummy_task, limit=10)
And from your initiator:
# Run as a flow (ideally via a deployment / worker)
result = background_flow("test")
print(result)
2) If you must start from a “task”, call a flow from the task and do .delay()
inside that flow
- Not my first choice, but works more reliably because the flow boundary owns orchestration:
from prefect import flow, task
@flow
async def orchestrate(input: str):
fut = dummy_task.delay(input)
return await fut.result_async()
@task
def background_task(input: str):
# Call the flow instead of calling .delay() directly here
return orchestrate(input)
- Serve only dummy_task
with the Task Worker; run the flow via a standard Prefect worker/deployment.
3) Fan-out directly from flows (preferred for load distribution)
- From a flow, launch many background tasks across your Task Workers:
@flow
async def router(inputs: list[str]):
futures = [
dummy_task.with_options(name=f"dummy-{i}").delay(x)
for i, x in enumerate(inputs)
]
results = await asyncio.gather(*[f.result_async() for f in futures])
return results
- Run multiple Task Worker processes, each serve
-ing the specific tasks they can handle. Prefect will assign pending background tasks to any worker serving that task. Keep using .with_options(name=...)
for dynamic run names.
Notes on routing and distribution
- Task Workers pick up only the tasks they are serving (i.e., the functions you pass to serve(...)
). To route work, split task functions by capability and start workers that serve different sets. Running multiple workers that serve the same task horizontally scales throughput.
- Tags are great for concurrency limits but they do not route background tasks to specific Task Workers; routing is based on which task function a worker serves.
- Use limit
in serve(..., limit=N)
to cap concurrency per worker.
Next steps and questions
- Are you able to move the nested .delay()
to a flow boundary as shown above? That should remove the serialization error.Marvin
09/04/2025, 11:23 AMNate
09/04/2025, 11:25 AMNate
09/04/2025, 11:26 AM@task
def background_task(input: str):
print(f"Running background task with input: {input}")
sub_task = dummy_task.delay(input)
return sub_task.result()
why delay
dummy_task if you block and resolve it immediately with .result()
?Syméon del Marmol
09/04/2025, 11:27 AM.delay()
not the .result()
Nate
09/04/2025, 11:28 AMSyméon del Marmol
09/04/2025, 11:30 AMSyméon del Marmol
09/04/2025, 11:32 AMNate
09/04/2025, 11:33 AMprefect version
output? not immediately able to reproduce
#[6]
@task
def dummy_task(input: str):
print(f"Running dummy task with input: {input}")
time.sleep(1)
return f"processed: {input}"
@task
def background_task(input: str):
print(f"Running background task with input: {input}")
sub_task = dummy_task.delay(input)
return sub_task.result()
if __name__ == "__main__":
# This serves both tasks (for testing)
background_task.delay("test")
serve(background_task, dummy_task, limit=10)
06:31:33.130 | INFO | prefect.task_worker - Starting task worker...
06:31:33.130 | INFO | prefect.task_worker - Subscribing to runs of task(s): background_task | dummy_task
06:31:33.388 | INFO | prefect.task_worker - Received task run: 068b9789-50c6-7f94-8000-0818d1ec377e - background_task
06:31:33.842 | INFO | prefect.task_worker - Received task run: 068b9789-5bb1-726e-8000-e18155edd6bf - dummy_task
06:31:34.979 | INFO | Task run 'dummy_task' - Finished in state Completed()
06:31:35.320 | INFO | Task run 'background_task' - Finished in state Completed()
Syméon del Marmol
09/04/2025, 11:33 AMSyméon del Marmol
09/04/2025, 11:34 AMNate
09/04/2025, 11:35 AMprefect config view
will tell you what PREFECT_API_URL
you're using, which might be cloud even if you have a server process running on your machineSyméon del Marmol
09/04/2025, 11:36 AMSyméon del Marmol
09/04/2025, 11:42 AMNate
09/04/2025, 11:47 AMSyméon del Marmol
09/04/2025, 11:47 AMSyméon del Marmol
09/04/2025, 11:48 AMSyméon del Marmol
09/04/2025, 11:50 AMSyméon del Marmol
09/04/2025, 11:51 AMPYTHONPATH=./ uv run python dummy_task.py
and PYTHONPATH=./ uv run python call_task.py
in my projectNate
09/04/2025, 12:59 PMSyméon del Marmol
09/04/2025, 1:02 PMNate
09/04/2025, 1:02 PMNate
09/04/2025, 1:03 PMNate
09/04/2025, 1:03 PMSyméon del Marmol
09/04/2025, 1:04 PMSyméon del Marmol
09/04/2025, 1:07 PMNate
09/04/2025, 1:25 PMSyméon del Marmol
09/04/2025, 1:25 PMNate
09/04/2025, 1:42 PMSyméon del Marmol
09/04/2025, 1:44 PMNate
09/04/2025, 1:45 PMSyméon del Marmol
09/04/2025, 1:45 PMNate
09/04/2025, 9:09 PMSyméon del Marmol
09/05/2025, 5:35 AMNate
09/05/2025, 1:09 PM