Giovanni Furetti
10/30/2025, 10:33 AM@Marvin Explain to me how the,submit, andmapmethods work when the task is asynchronous. Provide some examples as well.wait
Marvin
10/30/2025, 10:33 AMMarvin
10/30/2025, 10:41 AMsubmit, map, and wait. The key idea is that these methods behave the same for sync and async tasks — Prefect handles the async execution for you and always gives you Prefect futures to coordinate concurrency.
High-level behavior
- `task.submit(...)`: schedules one task run and returns a PrefectFuture immediately. This is synchronous (not awaitable), even if your task is async def.
- `task.map(...)`: schedules multiple task runs (one per element of at least one iterable argument) and returns a PrefectFutureList. Use prefect.unmapped to pass constants.
- `future.wait(timeout=None)`: block until that task finishes.
- `future.result(timeout=None, raise_on_failure=True)`: wait (if needed) and return the task’s Python value, re-raising the task error by default.
- For many-to-one orchestration, you don’t need to call .result() when passing a future into another task — Prefect resolves it automatically. Only call `.wait()`/`.result()` when you need the raw value in local Python code (e.g., printing, non-Prefect functions, conditional logic in the flow, etc.).
Async-specific notes
- You can write tasks as @task async def .... Prefect’s task engine runs the coroutine correctly under the hood.
- In an async flow (@flow async def ...), Prefect futures are still not awaitables; you should use `.wait()`/`.result()` or utilities like prefect.futures.as_completed(...).
Examples
1) Async task + submit + wait/result (sync flow)
from prefect import flow, task
@task
async def double_async(x: int) -> int:
# Simulate async I/O
import anyio
await anyio.sleep(0.2)
return x * 2
@flow
def run_once():
fut = double_async.submit(21) # returns a PrefectFuture immediately
# ... do other work here ...
fut.wait() # blocks until the task run completes
value = fut.result() # the actual Python value (42)
print("result:", value)
if __name__ == "__main__":
run_once()
2) Map over an async task (sync flow)
from prefect import flow, task, unmapped
@task
async def add_async(x: int, y: int) -> int:
import anyio
await anyio.sleep(0.1)
return x + y
@flow
def map_example():
xs = [1, 2, 3, 4]
# Broadcast a constant with `unmapped`
futures = add_async.map(xs, y=unmapped(10)) # PrefectFutureList
futures.wait() # wait for all mapped runs to complete
results = futures.result() # list of Python results
print(results) # [11, 12, 13, 14]
if __name__ == "__main__":
map_example()
3) Process results as they finish with as_completed
from prefect import flow, task
from prefect.futures import as_completed
@task
async def work(i: int) -> str:
import anyio, random
await anyio.sleep(random.random()) # variable completion time
return f"done {i}"
@flow
def stream_results():
futures = work.map(range(5))
for fut in as_completed(futures):
print(fut.result()) # prints in order of completion
if __name__ == "__main__":
stream_results()
4) Async flow driving async tasks
from prefect import flow, task
from prefect.futures import as_completed
@task
async def fetch(i: int) -> int:
import anyio
await anyio.sleep(0.1)
return i * i
@flow
async def async_flow():
f1 = fetch.submit(3)
f2 = fetch.submit(5)
# Prefer streaming with as_completed to avoid a single blocking call
for fut in as_completed([f1, f2]):
print("got:", fut.result())
# Or if you just want the values:
vals = [f1.result(), f2.result()]
return vals
if __name__ == "__main__":
import anyio
anyio.run(async_flow)
5) Using wait_for to express dependencies without passing data
```
from prefect import flow, task
@task
async def setup():
...
@task
async def step_a():
...
@task
async def step_b():
...
@task
async def finalize():
...
@flow
def dependency_example():Marvin
10/30/2025, 10:41 AMawait fut is not supported; use fut.wait() or fut.result().
- If you pass a future into another task, Prefect waits automatically – no manual .result() needed.
- For large maps, combine with concurrency limits using tags if needed: see “Tag-based concurrency limits” in the docs.
- return_state=True on `submit`/`map` returns State objects instead of futures; useful for inspecting states directly but not required for normal dataflow.
Docs and references
- Run work concurrently (submit/map): How to run work concurrently
- Prefect futures API: Prefect futures API
- Task runners (threads/processes/distributed): Task runners
- Tasks concept: Tasks
If you share a small snippet of your current flow, I can tailor these patterns to your exact use case (e.g., mapping async HTTP calls, streaming results, or controlling concurrency).