&gt; <@ULVA73B9P> Explain to me how the `submit`, ...
# marvin-ai
g
@Marvin Explain to me how the
submit
,
map
, and
wait
methods work when the task is asynchronous. Provide some examples as well.
m
thought for 436.1 seconds
Great question! In Prefect 3.x, async tasks work seamlessly with
submit
,
map
, and
wait
. The key idea is that these methods behave the same for sync and async tasks — Prefect handles the async execution for you and always gives you Prefect futures to coordinate concurrency. High-level behavior - `task.submit(...)`: schedules one task run and returns a
PrefectFuture
immediately. This is synchronous (not awaitable), even if your task is
async def
. - `task.map(...)`: schedules multiple task runs (one per element of at least one iterable argument) and returns a
PrefectFutureList
. Use
prefect.unmapped
to pass constants. - `future.wait(timeout=None)`: block until that task finishes. - `future.result(timeout=None, raise_on_failure=True)`: wait (if needed) and return the task’s Python value, re-raising the task error by default. - For many-to-one orchestration, you don’t need to call
.result()
when passing a future into another task — Prefect resolves it automatically. Only call `.wait()`/`.result()` when you need the raw value in local Python code (e.g., printing, non-Prefect functions, conditional logic in the flow, etc.). Async-specific notes - You can write tasks as
@task async def ...
. Prefect’s task engine runs the coroutine correctly under the hood. - In an async flow (
@flow async def ...
), Prefect futures are still not awaitables; you should use `.wait()`/`.result()` or utilities like
prefect.futures.as_completed(...)
. Examples 1) Async task + submit + wait/result (sync flow)
Copy code
from prefect import flow, task

@task
async def double_async(x: int) -> int:
    # Simulate async I/O
    import anyio
    await anyio.sleep(0.2)
    return x * 2

@flow
def run_once():
    fut = double_async.submit(21)  # returns a PrefectFuture immediately
    # ... do other work here ...
    fut.wait()                     # blocks until the task run completes
    value = fut.result()           # the actual Python value (42)
    print("result:", value)

if __name__ == "__main__":
    run_once()
2) Map over an async task (sync flow)
Copy code
from prefect import flow, task, unmapped

@task
async def add_async(x: int, y: int) -> int:
    import anyio
    await anyio.sleep(0.1)
    return x + y

@flow
def map_example():
    xs = [1, 2, 3, 4]
    # Broadcast a constant with `unmapped`
    futures = add_async.map(xs, y=unmapped(10))  # PrefectFutureList
    futures.wait()               # wait for all mapped runs to complete
    results = futures.result()   # list of Python results
    print(results)               # [11, 12, 13, 14]

if __name__ == "__main__":
    map_example()
3) Process results as they finish with as_completed
Copy code
from prefect import flow, task
from prefect.futures import as_completed

@task
async def work(i: int) -> str:
    import anyio, random
    await anyio.sleep(random.random())  # variable completion time
    return f"done {i}"

@flow
def stream_results():
    futures = work.map(range(5))
    for fut in as_completed(futures):
        print(fut.result())  # prints in order of completion

if __name__ == "__main__":
    stream_results()
4) Async flow driving async tasks
Copy code
from prefect import flow, task
from prefect.futures import as_completed

@task
async def fetch(i: int) -> int:
    import anyio
    await anyio.sleep(0.1)
    return i * i

@flow
async def async_flow():
    f1 = fetch.submit(3)
    f2 = fetch.submit(5)
    # Prefer streaming with as_completed to avoid a single blocking call
    for fut in as_completed([f1, f2]):
        print("got:", fut.result())
    # Or if you just want the values:
    vals = [f1.result(), f2.result()]
    return vals

if __name__ == "__main__":
    import anyio
    anyio.run(async_flow)
5) Using wait_for to express dependencies without passing data ``` from prefect import flow, task @task async def setup(): ... @task async def step_a(): ... @task async def step_b(): ... @task async def finalize(): ... @flow def dependency_example():
s = setup.submit() a = step_a.submit(wait_for=[s]) b = step_b.submit(wait_for=[s]) # finalize runs only after both a and b complete done = finalize.submit(wait_for=[a, b]) return done.result() ``` Gotchas and tips - Don’t await futures:
await fut
is not supported; use
fut.wait()
or
fut.result()
. - If you pass a future into another task, Prefect waits automatically – no manual
.result()
needed. - For large maps, combine with concurrency limits using tags if needed: see “Tag-based concurrency limits” in the docs. -
return_state=True
on `submit`/`map` returns State objects instead of futures; useful for inspecting states directly but not required for normal dataflow. Docs and references - Run work concurrently (submit/map): How to run work concurrently - Prefect futures API: Prefect futures API - Task runners (threads/processes/distributed): Task runners - Tasks concept: Tasks If you share a small snippet of your current flow, I can tailor these patterns to your exact use case (e.g., mapping async HTTP calls, streaming results, or controlling concurrency).