oleander-whirlwind
10/20/2025, 11:01 AMMarvin
10/20/2025, 11:01 AMMarvin
10/20/2025, 11:07 AM@task
def do(x): ...
f = do.submit(1, wait_for=[other_future], return_state=False)
- Task.map(...)
- Expands your task across iterables, scheduling one run per item.
- Zips across multiple iterables; use unmapped(...) to broadcast constants.
- Returns a PrefectFutureList preserving input order.
- Signature (Prefect 3.x):
from prefect import unmapped
@task
def add(x, y): ...
futures = add.map([1,2,3], [10,20,30]) # zipped
futures = add.map([1,2,3], y=unmapped(100)) # broadcast constant
Concurrency and performance
- Both .submit() and .map() schedule work to the flow’s task runner; they can run concurrently subject to your task runner and infra limits.
- Configure concurrency via the task runner on the flow:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def main(): ...
- Use ThreadPoolTaskRunner for I/O-bound work; ProcessPoolTaskRunner for CPU-bound work.
- .map() is mostly ergonomic sugar for many .submit() calls; you can loop with .submit() if you want finer-grained control or batching.
Argument expansion and ordering
- .map() zips over any iterable args/kwargs; all mapped iterables must be the same length or you’ll get a MappingLengthMismatch error.
- Use unmapped(...) to pass a constant alongside mapped iterables.
- Results from .map() are ordered to match the input iterables; completion order is not guaranteed. To process as tasks finish:
from prefect.futures import as_completed
for fut in as_completed(futures):
print(fut.result())
Dependencies and barriers
- Both support wait_for to enforce upstream dependencies:
# submit
step2 = task2.submit(wait_for=[step1])
# map (barrier before all children start)
children = task.map(items, wait_for=[upstream])
Error handling and retries
- Retries, timeouts, caching, etc. are per-task settings and apply to each mapped child independently.
- If one mapped child fails, others still run; failures surface when you call .result() on that specific future.
- To continue downstream even if some upstreams failed, wrap with allow_failure:
from prefect import allow_failure
safe_children = allow_failure(futures) # works with single futures or lists
downstream = next_task.map(safe_children)
- You can also set return_state=True to get State objects directly instead of futures.
When to choose which
- Choose .submit() when:
- You’re submitting a single call
- You want explicit, stepwise DAG construction
- You need fine-grained control over dependencies or batching (e.g., loop and batch with .submit())
- Choose .map() when:
- You have a clear fan-out over an iterable (e.g., process N files/partitions)
- You want easy zipped/broadcasted argument handling and an ordered list of futures
- You plan to fan-in later by collecting results or iterating with as_completed
Example comparison
- Using .map():
```
from prefect import flow, task, unmapped
from prefect.futures import as_completed
@task(retries=2)
def transform(x, y):
return x + y
@flow
def main():
xs = [1, 2, 3]
ys = [10, 20, 30]
futs = transform.map(xs, ys) # zipped
futs2 = transform.map(xs, y=unmapped(100)) # broadcastMarvin
10/20/2025, 11:07 AM- Equivalent with .submit():
from prefect import flow, task
@task
def transform(x, y):
return x + y
@flow
def main():
xs = [1, 2, 3]
ys = [10, 20, 30]
futs = [transform.submit(x, y) for x, y in zip(xs, ys)]
results = [f.result() for f in futs]
```
Notes on versions
- This guidance is for Prefect 3.x. The concepts are similar to 2.x, but 3.x APIs and internals are streamlined. If you’re on 2.x and considering an upgrade, 3.x retains .map() and .submit() patterns with better ergonomics.
If you can share your use case (size of the iterable, CPU vs I/O, batching needs, error tolerance), I can suggest a concrete pattern and task runner settings.