<@ULVA73B9P> I'm working with background tasks to ...
# ask-marvin
s
@Marvin I'm working with background tasks to run flows in workers from a backend. Within these flows, I call other background tasks to distribute the load and forward that to potentially other worker processes, based on who serves what. However, when I call
task.delay()
within another task or from a flow called by a background task, the serialization fails with the following error:
Task run encountered an exception TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
Here is a very simple example that reproduce the issue when calling "dummy_task":
Copy code
@task
def dummy_task(input: str):
    print(f"Running dummy task with input: {input}")
    time.sleep(1)
    return f"processed: {input}"

@task
def background_task(input: str):
    print(f"Running background task with input: {input}")
    sub_task = dummy_task.delay(input)
    return sub_task.result()

if __name__ == "__main__":
    # This serves both tasks (for testing)
    serve(background_task, dummy_task, limit=10)
And the task initiator executes:
Copy code
task = background_task.delay("test")
result = task.result()
print(result)
What is wrong with this code ? What are the other ways to distribute the load among multiple workers when calling tasks from a background task ?
m
thought for 611.0 seconds
Thanks for the clear MRE — your code is conceptually fine. The failure you’re seeing is due to a known serialization bug that appears when a task tries to launch another background task (i.e., calling
task.delay()
from inside a running task), or when a flow is executed from a background task and then calls
.delay()
. The error you posted matches what we’ve seen elsewhere: - Launching a task from a task causes 'MockValSer' serialization error - Related context for the same Pydantic error in other nested/edge cases: serialization on cancel and flow-run cleanup/cancel What’s “wrong” with the code - Nothing logically — it’s hitting the nested-background-task serialization bug. - In Prefect 3.x today, background task submissions should be initiated from a flow (not from another task). Submitting background tasks from inside tasks is not reliable due to the serialization of the run context. Recommended patterns that work today 1) Move background orchestration into a flow - Have a flow submit background tasks via
.delay()
and await with
.result_async()
(as you’re already doing in other places). - Serve only the leaf tasks with the Task Worker. Working version of your example:
Copy code
from prefect import flow, task
from prefect.task_worker import serve
import time
import asyncio

@task
def dummy_task(input: str):
    print(f"Running dummy task with input: {input}")
    time.sleep(1)
    return f"processed: {input}"

@flow
async def background_flow(input: str):
    sub_future = dummy_task.with_options(name=f"dummy-{input}").delay(input)
    result = await sub_future.result_async()
    return result

if __name__ == "__main__":
    # Serve only the task(s)
    serve(dummy_task, limit=10)
And from your initiator:
Copy code
# Run as a flow (ideally via a deployment / worker)
result = background_flow("test")
print(result)
2) If you must start from a “task”, call a flow from the task and do
.delay()
inside that flow - Not my first choice, but works more reliably because the flow boundary owns orchestration:
Copy code
from prefect import flow, task

@flow
async def orchestrate(input: str):
    fut = dummy_task.delay(input)
    return await fut.result_async()

@task
def background_task(input: str):
    # Call the flow instead of calling .delay() directly here
    return orchestrate(input)
- Serve only
dummy_task
with the Task Worker; run the flow via a standard Prefect worker/deployment. 3) Fan-out directly from flows (preferred for load distribution) - From a flow, launch many background tasks across your Task Workers:
Copy code
@flow
async def router(inputs: list[str]):
    futures = [
        dummy_task.with_options(name=f"dummy-{i}").delay(x)
        for i, x in enumerate(inputs)
    ]
    results = await asyncio.gather(*[f.result_async() for f in futures])
    return results
- Run multiple Task Worker processes, each
serve
-ing the specific tasks they can handle. Prefect will assign pending background tasks to any worker serving that task. Keep using
.with_options(name=...)
for dynamic run names. Notes on routing and distribution - Task Workers pick up only the tasks they are serving (i.e., the functions you pass to
serve(...)
). To route work, split task functions by capability and start workers that serve different sets. Running multiple workers that serve the same task horizontally scales throughput. - Tags are great for concurrency limits but they do not route background tasks to specific Task Workers; routing is based on which task function a worker serves. - Use
limit
in
serve(..., limit=N)
to cap concurrency per worker. Next steps and questions - Are you able to move the nested
.delay()
to a flow boundary as shown above? That should remove the serialization error.
- If you need more explicit routing rules (“who serves what”), can you share how you decide which worker should get which jobs? We can suggest a task sharding pattern that aligns with Task Workers’ “serve specific functions” model. References - Background tasks docs: Background tasks in Prefect 3 - Known issue for nested task submission: Issue #16422
n
i dont know about this answer from Marvin
im also curious about the use case
Copy code
@task
def background_task(input: str):
    print(f"Running background task with input: {input}")
    sub_task = dummy_task.delay(input)
    return sub_task.result()
why
delay
dummy_task if you block and resolve it immediately with
.result()
?
s
@Nate This is not what I use for real: I'm not awaiting directly. Here was to recreate a very simple case. But the error is raised when calling the
.delay()
not the
.result()
n
okay makes sense. ill take a look
s
But this is clearly an issue that pops up because of the "serialize_as_any=True" here: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/context.py#L472 I tried without, and it doesn't crash like that
Interestingly, this is not set to True for flow context serialization..
n
can you share your
prefect version
output? not immediately able to reproduce
Copy code
#[6]
@task
def dummy_task(input: str):
    print(f"Running dummy task with input: {input}")
    time.sleep(1)
    return f"processed: {input}"

@task
def background_task(input: str):
    print(f"Running background task with input: {input}")
    sub_task = dummy_task.delay(input)
    return sub_task.result()

if __name__ == "__main__":
    # This serves both tasks (for testing)
    background_task.delay("test")
    serve(background_task, dummy_task, limit=10)

06:31:33.130 | INFO    | prefect.task_worker - Starting task worker...
06:31:33.130 | INFO    | prefect.task_worker - Subscribing to runs of task(s): background_task | dummy_task
06:31:33.388 | INFO    | prefect.task_worker - Received task run: 068b9789-50c6-7f94-8000-0818d1ec377e - background_task
06:31:33.842 | INFO    | prefect.task_worker - Received task run: 068b9789-5bb1-726e-8000-e18155edd6bf - dummy_task
06:31:34.979 | INFO    | Task run 'dummy_task' - Finished in state Completed()
06:31:35.320 | INFO    | Task run 'background_task' - Finished in state Completed()
🫥 1
s
Version: 3.4.15 API version: 0.8.4 Python version: 3.12.8 Git commit: 95f11540 Built: Fri, Aug 29, 2025 05:13 PM OS/Arch: darwin/arm64 Profile: prefect-cloud Server type: cloud Pydantic version: 2.11.7
I don't know why it says "prefect-cloud": I'm running the server locally.
n
well
prefect config view
will tell you what
PREFECT_API_URL
you're using, which might be cloud even if you have a server process running on your machine
s
Ok then, actually I'm overriding the env variable via docker, this is why it says that locally
What is quite weird, is that I have done a very similar pattern in my code that worked already quite well: delay -> background_task -> Flow -> mix of background_tasks / submitted tasks / simple calls But now I just get that error whatever I do.. which makes me crazy for 24h 😅
n
does the snippet you share above actually produce the error for you? or does your non-simplified version of that snippet throw
s
Let me retry
Yes it does throw
let me send you the files:
running them with
PYTHONPATH=./ uv run python dummy_task.py
and
PYTHONPATH=./ uv run python call_task.py
in my project
n
ah i reproduced - it apparently only happens when you have separate files
🤯 1
s
That's interesting ! so what should be in the same file ? the entry point of both the worker and the background task caller processes ?
n
well, no i think its a bug. im just saying that i didnt reproduce at first because i was calling .delay right before .serve so i could try to repro right in ipython
but the bug only manifest when you do the normal/correct thing and have separate files
i should have a PR to fix this shortly
🎉 1
s
Great thanks a lot
If I can abuse from you: this bug marked as solved, was not properly fixed and the race condition is still there.. I reproduce the issue a lot, randomly. I wrote a comment to explain my point but I'm not sure it anyone look at closed bug or if I should open a new one: https://github.com/PrefectHQ/prefect/issues/18278#issuecomment-3244381547 This is quite annoying, again when using background tasks 🙂
n
ive reopened the issue, we'll take a look, thanks
s
Thanks a lot 🙂 !
s
What's the release cycle at prefect ?
n
thursdays (today)
s
Alright ! great thanks for having been so reactive
catjam 1
n
that race condition should also be fixed here. and fyi we're going to delay the release to tomorrow morning - sorry for any inconvenience
s
No problem, thanks for the follow up. This is even better than expected to have both fixes in such a short term. I looked at the fix you made: it look way more event driven. I like that ! 🔥
n
catjam