Georg Raiser
08/30/2025, 1:58 PMprefect.deployments.run_deployment
inside a asyncio.create_task()
- nothing is every deployed. Am I not getting something here? Are there any pitfalls of using run_deployment
in conjunction with asyncio
that I should be aware of?Marvin
08/30/2025, 1:58 PMMarvin
08/30/2025, 2:03 PMprefect.deployments.run_deployment
is async. You must await it (directly or via asyncio.gather
). If you use asyncio.create_task
, you still need to keep the event loop alive and await those tasks before your function exits.
- By default, run_deployment
will wait for the triggered flow run to finish (it polls). Set timeout=0
if you want a fire-and-forget trigger that returns immediately.
- A worker must be running for the deployment’s work pool. If no worker is polling that pool, you’ll create a flow run but it will never start.
- as_subflow
defaults to True when called from within a flow; this only affects UI parent/child linkage, not execution. Set as_subflow=False
if you want it to appear as a separate top-level run.
Common pitfalls with `asyncio.create_task`:
- Creating tasks but never awaiting them. If your function returns (or your script exits), pending tasks are cancelled and the API call may never execute.
- Swallowed exceptions. If you don’t await tasks, exceptions inside them can be easy to miss. Use await asyncio.gather(..., return_exceptions=True)
or add a done-callback that logs exceptions.
- No event loop running (e.g., calling asyncio.create_task
in a synchronous @flow
without an active loop).
Minimal working examples
1) Fire-and-forget triggers (don’t wait for completion), safely awaited so the API calls are made:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
import asyncio
@flow
async def trigger_many():
tasks = [
asyncio.create_task(
run_deployment(
name="my-flow/my-deployment",
parameters={"i": i},
timeout=0, # return immediately after creating the run
as_subflow=False # optional; just affects UI linkage
)
)
for i in range(5)
]
# Ensure the API calls actually run and surface any errors
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
2) Wait for completion of all triggered runs:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
import asyncio
@flow
async def trigger_and_wait():
tasks = [
asyncio.create_task(
run_deployment(
name="my-flow/my-deployment",
parameters={"i": i},
# timeout=None (default) means wait indefinitely for completion
)
)
for i in range(3)
]
runs = await asyncio.gather(*tasks)
return runs
Checklist if “nothing is ever deployed”:
- Verify the deployment name string is exact: "flow-name/deployment-name".
- Make sure you await the tasks or gather them before your function ends.
- Ensure a worker is running and polling the correct work pool for that deployment.
- Check for exceptions in your tasks; use return_exceptions=True
or add done-callbacks to log failures.
Docs:
- How-to: Run deployments programmatically
- API reference: prefect.deployments.flow_runs.run_deployment
If you can share a small snippet of how you’re calling create_task
(and whether you’re inside a Prefect flow or a plain async function), I can point out the exact fix. Also, do you have a worker running for the target work pool?Georg Raiser
08/30/2025, 2:52 PMMarvin
08/30/2025, 2:53 PMMarvin
08/30/2025, 2:53 PMruns = []
for i in range(5):
run = await run_deployment(name="flow/deployment", parameters={"i": i})
runs.append(run)
Correct concurrent pattern:
tasks = [
asyncio.create_task(
run_deployment(name="flow/deployment", parameters={"i": i})
)
for i in range(5)
]
runs = await asyncio.gather(*tasks, return_exceptions=True)
2) If you wrap run_deployment in another coroutine, still gather at the top
This is fine (a “nested await”), so long as you still await all tasks:
async def trigger_one(i):
return await run_deployment(name="flow/deployment", parameters={"i": i})
tasks = [asyncio.create_task(trigger_one(i)) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
3) Ensure you’re not “fire-and-forget” without keeping the loop alive
Anti-pattern:
for i in range(5):
asyncio.create_task(run_deployment(name="flow/deployment", parameters={"i": i}))
# function returns here; tasks may never run
4) Decide whether to wait for completion
- By default, run_deployment
will poll and wait for the run to finish (it can sit “forever” if no worker starts it).
- Use timeout=0
if you only want to trigger runs and return immediately:
await run_deployment(name="flow/deployment", parameters={"i": 1}, timeout=0)
This is useful to prove the trigger is happening, then you can check the UI or logs.
5) Verify a worker is polling the correct work pool
If no worker is running, your code will create flow runs but they’ll never start. In that case, an await run_deployment(...)
with default timeout will just sit there.
6) UI linkage vs execution
as_subflow=True
(default when called from a flow) only changes how it’s linked in the UI. The deployment still runs as its own flow run picked up by a worker. Set as_subflow=False
if you want it to show as a top-level run.
Docs:
- How-to: Run deployments programmatically
- API reference: prefect.deployments.flow_runs.run_deployment
If you can share a small snippet of how you’re nesting the awaits (and whether you’re inside an async @flow
), I can point out exactly where it’s getting serialized or dropped. Also, do you have a worker running for the target work pool?Georg Raiser
08/30/2025, 2:59 PMMarvin
08/30/2025, 2:59 PMMarvin
08/30/2025, 2:59 PMasyncio.run()
(or create a new loop and run_until_complete
) inside an async @flow
, you’ll hit errors like: “asyncio.run() cannot be called from a running event loop,” or you’ll disrupt Prefect’s cancellation and context handling.
- In sync flows, asyncio.run()
might seem to work locally, but it can interfere with Prefect’s engine (cancellation, logging, contextvars) and can break when the code runs in different environments (e.g., threads/executors).
Anti-patterns
1) Inside an async flow:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
import asyncio
@flow
async def parent():
# BAD: there’s already a running loop
asyncio.run(run_deployment(name="x/y"))
2) Inside a sync flow:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
import asyncio
@flow
def parent():
# Risky: may fight the engine’s runtime/cancellation
asyncio.run(run_deployment(name="x/y"))
The right patterns
- If your flow is async, just await or use asyncio utilities directly:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
import asyncio
@flow
async def parent():
# Await directly
fr = await run_deployment(name="x/y", timeout=0)
# Or concurrently
tasks = [
asyncio.create_task(run_deployment(name="x/y", parameters={"i": i}, timeout=0))
for i in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
- If your flow is sync, bridge async calls with Prefect’s helper instead of starting your own loop:
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
from prefect.utilities.asyncutils import run_coro_as_sync
@flow
def parent():
fr = run_coro_as_sync(run_deployment(name="x/y", timeout=0))
return fr
- Running blocking sync code from an async flow? Use:
from prefect.utilities.asyncutils import run_sync_in_worker_thread
@flow
async def parent():
result = await run_sync_in_worker_thread(blocking_fn, *args, **kwargs)
- Calling async code from a worker thread? Use:
from prefect.utilities.asyncutils import run_async_from_worker_thread
def in_thread():
# bridge back to the flow’s loop safely
return run_async_from_worker_thread(run_deployment(name="x/y", timeout=0))
Other symptoms to watch for
- “asyncio.run() cannot be called from a running event loop” → you’re in an async flow or async context already.
- Tasks silently never run → you created tasks but didn’t await them and your function returned; or you started your own loop that conflicts with Prefect’s.
- Indefinite waiting → a worker isn’t polling the deployment’s work pool; run_deployment
is waiting for completion (unless you set timeout=0
).
Docs:
- Run deployments programmatically: How to run deployments
- API: prefect.deployments.flow_runs.run_deployment
If you can share whether your flow is async def
or def
, and a short snippet of where you’re creating/awaiting tasks, I can point to the exact fix.