<@ULVA73B9P> what is the difference between .map()...
# ask-marvin
o
@Marvin what is the difference between .map() and .submit() and when to use which?
m
thought for 332.3 seconds
Great question! Short answer: - Use .submit() to kick off a single task run asynchronously and get back one PrefectFuture. - Use .map() to fan out a task across an iterable, scheduling many task runs at once, and get back a PrefectFutureList (ordered like the input). Details and when to use each What they do - Task.submit(...) - Schedules one task run and returns a PrefectFuture. - Good for chaining steps and building a DAG one node at a time. - Lets you explicitly manage dependencies via wait_for. - Signature (Prefect 3.x):
Copy code
@task
    def do(x): ...
    f = do.submit(1, wait_for=[other_future], return_state=False)
- Task.map(...) - Expands your task across iterables, scheduling one run per item. - Zips across multiple iterables; use unmapped(...) to broadcast constants. - Returns a PrefectFutureList preserving input order. - Signature (Prefect 3.x):
Copy code
from prefect import unmapped

    @task
    def add(x, y): ...
    futures = add.map([1,2,3], [10,20,30])                # zipped
    futures = add.map([1,2,3], y=unmapped(100))           # broadcast constant
Concurrency and performance - Both .submit() and .map() schedule work to the flow’s task runner; they can run concurrently subject to your task runner and infra limits. - Configure concurrency via the task runner on the flow:
Copy code
from prefect import flow
  from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner

  @flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
  def main(): ...
- Use ThreadPoolTaskRunner for I/O-bound work; ProcessPoolTaskRunner for CPU-bound work. - .map() is mostly ergonomic sugar for many .submit() calls; you can loop with .submit() if you want finer-grained control or batching. Argument expansion and ordering - .map() zips over any iterable args/kwargs; all mapped iterables must be the same length or you’ll get a MappingLengthMismatch error. - Use unmapped(...) to pass a constant alongside mapped iterables. - Results from .map() are ordered to match the input iterables; completion order is not guaranteed. To process as tasks finish:
Copy code
from prefect.futures import as_completed
  for fut in as_completed(futures):
      print(fut.result())
Dependencies and barriers - Both support wait_for to enforce upstream dependencies:
Copy code
# submit
  step2 = task2.submit(wait_for=[step1])

  # map (barrier before all children start)
  children = task.map(items, wait_for=[upstream])
Error handling and retries - Retries, timeouts, caching, etc. are per-task settings and apply to each mapped child independently. - If one mapped child fails, others still run; failures surface when you call .result() on that specific future. - To continue downstream even if some upstreams failed, wrap with allow_failure:
Copy code
from prefect import allow_failure

  safe_children = allow_failure(futures)   # works with single futures or lists
  downstream = next_task.map(safe_children)
- You can also set return_state=True to get State objects directly instead of futures. When to choose which - Choose .submit() when: - You’re submitting a single call - You want explicit, stepwise DAG construction - You need fine-grained control over dependencies or batching (e.g., loop and batch with .submit()) - Choose .map() when: - You have a clear fan-out over an iterable (e.g., process N files/partitions) - You want easy zipped/broadcasted argument handling and an ordered list of futures - You plan to fan-in later by collecting results or iterating with as_completed Example comparison - Using .map(): ``` from prefect import flow, task, unmapped from prefect.futures import as_completed @task(retries=2) def transform(x, y): return x + y @flow def main(): xs = [1, 2, 3] ys = [10, 20, 30] futs = transform.map(xs, ys) # zipped futs2 = transform.map(xs, y=unmapped(100)) # broadcast
results = [f.result() for f in futs] # preserves input order for f in as_completed(futs2): # process as they finish print(f.result())
Copy code
- Equivalent with .submit():
from prefect import flow, task @task def transform(x, y): return x + y @flow def main(): xs = [1, 2, 3] ys = [10, 20, 30] futs = [transform.submit(x, y) for x, y in zip(xs, ys)] results = [f.result() for f in futs] ``` Notes on versions - This guidance is for Prefect 3.x. The concepts are similar to 2.x, but 3.x APIs and internals are streamlined. If you’re on 2.x and considering an upgrade, 3.x retains .map() and .submit() patterns with better ergonomics. If you can share your use case (size of the iterable, CPU vs I/O, batching needs, error tolerance), I can suggest a concrete pattern and task runner settings.