Nils
09/03/2025, 1:57 PMMarvin
09/03/2025, 1:57 PMMarvin
09/03/2025, 2:00 PM@flow
decorator skips that pre-processing, so it “works.”
Why this happens
- Prefect visits/traverses flow inputs (e.g., to validate, build cache/concurrency keys, replace futures, etc.). Traversal uses utilities like visit_collection
, which will iterate over iterables to inspect contents—consuming one-shot iterators/generators in the process.
- For deployed or API-triggered runs, parameters must be JSON-serializable anyway; iterators cannot be serialized.
- Docs/code refs:
- prefect.utilities.collections (visit_collection)
- visit_collection in source
Recommended workarounds
- Materialize before passing (best for small/known-size inputs):
@flow
def my_flow(items: list[int]):
for x in items:
...
my_flow(list(my_generator()))
- Pass a factory/callable and construct the iterator inside the flow (safe for large/streaming):
from typing import Callable, Iterator
def make_items() -> Iterator[int]:
for i in range(10):
yield i
@flow
def my_flow(factory: Callable[[], Iterator[int]]):
it = factory()
for x in it:
...
my_flow(make_items)
- Pass parameters to reconstruct the iterator inside the flow:
@flow
def my_flow(start: int, stop: int):
for i in range(start, stop):
...
my_flow(0, 10)
- Pass a reference to data (e.g., a path, S3 URL, query) and open/stream within the flow or a task, rather than passing the iterator itself.
Caution on alternatives
- itertools.tee
won’t help if Prefect consumes the one you pass before the flow runs.
- Wrapping in a custom iterable may still be traversed; prefer passing a factory (callable) or materialized data.
If you can share a small snippet (your generator creation and the flow signature), I can confirm precisely where it’s getting consumed and suggest the cleanest pattern for your case.