<@ULVA73B9P> When I pass an Iterator to a flow, it...
# ask-marvin
n
@Marvin When I pass an Iterator to a flow, it is empty. When removing the flow decorator, everything works as expected.
m
thought for 206.3 seconds
Short answer: generators/iterators are single-use, and the Prefect flow wrapper inspects/traverses your arguments before your function body runs. That early traversal iterates over the iterator to materialize/validate/log parameters, which exhausts it—so by the time your flow code executes, it’s empty. Removing the
@flow
decorator skips that pre-processing, so it “works.” Why this happens - Prefect visits/traverses flow inputs (e.g., to validate, build cache/concurrency keys, replace futures, etc.). Traversal uses utilities like
visit_collection
, which will iterate over iterables to inspect contents—consuming one-shot iterators/generators in the process. - For deployed or API-triggered runs, parameters must be JSON-serializable anyway; iterators cannot be serialized. - Docs/code refs: - prefect.utilities.collections (visit_collection) - visit_collection in source Recommended workarounds - Materialize before passing (best for small/known-size inputs):
Copy code
@flow
  def my_flow(items: list[int]):
      for x in items:
          ...

  my_flow(list(my_generator()))
- Pass a factory/callable and construct the iterator inside the flow (safe for large/streaming):
Copy code
from typing import Callable, Iterator

  def make_items() -> Iterator[int]:
      for i in range(10):
          yield i

  @flow
  def my_flow(factory: Callable[[], Iterator[int]]):
      it = factory()
      for x in it:
          ...

  my_flow(make_items)
- Pass parameters to reconstruct the iterator inside the flow:
Copy code
@flow
  def my_flow(start: int, stop: int):
      for i in range(start, stop):
          ...

  my_flow(0, 10)
- Pass a reference to data (e.g., a path, S3 URL, query) and open/stream within the flow or a task, rather than passing the iterator itself. Caution on alternatives -
itertools.tee
won’t help if Prefect consumes the one you pass before the flow runs. - Wrapping in a custom iterable may still be traversed; prefer passing a factory (callable) or materialized data. If you can share a small snippet (your generator creation and the flow signature), I can confirm precisely where it’s getting consumed and suggest the cleanest pattern for your case.