Daniel Leicht
09/08/2025, 6:42 AMstate = asyncio.run(poll_flow_state(flow_run))
if state.is_completed() or state.is_failed():
<http://logger.info|logger.info>(f"Flow {flow_run.name} finished with state: {state.type}")
else:
still_running.append(flow_run)
Marvin
09/08/2025, 6:43 AMMarvin
09/08/2025, 6:45 AMprefect.states.get_state_result(state)
to resolve the subflow’s return value from the State
you polled.
Example adapting your snippet:
from prefect.states import get_state_result, get_state_exception
state = asyncio.run(poll_flow_state(flow_run))
if state.is_completed():
value = get_state_result(state) # resolves and returns the subflow's return value
<http://logger.info|logger.info>(f"Subflow {flow_run.name} returned: {value!r}")
elif state.is_failed():
# get_state_result raises by default on failure
try:
get_state_result(state) # raises the original exception
except Exception as exc:
logger.exception(f"Subflow {flow_run.name} failed", exc_info=exc)
else:
still_running.append(flow_run)
Notes:
- get_state_result
raises on FAILED/CRASHED/CANCELLED by default. If you prefer not to raise, call get_state_result(state, raise_on_failure=False)
and/or inspect get_state_exception(state)
for details.
- If you’re calling a subflow from inside another flow, you can usually just call the subflow function directly and use its return value without polling.
Refs: see get_state_result
and related helpers in prefect.states helpers.
Quick question to ensure we point you to the best approach: are you polling these subflows from outside of a Prefect flow (e.g., a controller script), or from inside a parent flow?Daniel Leicht
09/09/2025, 4:26 AMTypeError: 'coroutine' object is not iterable
When checking the return resultMarvin
09/09/2025, 4:27 AMMarvin
09/09/2025, 4:30 AMfuture.result()
- For collections/nested structures of futures: use prefect.futures.resolve_futures_to_results(...)
- To manage many futures: use prefect.futures.wait(...)
or as_completed(...)
Docs and API
- Prefect Futures overview: prefect.futures docs
- Source for futures helpers: prefect.futures.py
Common causes of "coroutine object is not iterable" in flows
- Calling a bare async function (not a task) inside a sync flow and then iterating it:
- Problem:
@flow
def main():
data = get_rows() # get_rows is async def
for r in data: # TypeError: 'coroutine' object is not iterable
...
- Fix 1 (await it in an async flow):
@flow
async def main():
data = await get_rows()
for r in data:
...
- Fix 2 (wrap it as a Prefect task and resolve the future):
from prefect import flow, task
@task
async def get_rows():
...
@flow
def main():
rows_fut = get_rows() # PrefectFuture
rows = rows_fut.result() # concrete list
for r in rows:
...
- Iterating over a task call directly instead of its resolved result:
- Problem:
items = produce_items() # returns PrefectFuture
for x in items: # TypeError: 'PrefectFuture' is not iterable (or similar)
...
- Fix:
items = produce_items().result()
for x in items:
...
- Returning a coroutine from a task/flow by accident (e.g., returning an un-awaited async call) and consuming it upstream.
Working with lists of tasks
- Map across data:
from prefect import flow, task
@task
def double(x): return x * 2
@flow
def main():
futs = double.map(range(5)) # PrefectFutureList
# Option A: resolve all results
results = [f.result() for f in futs]
# Option B: resolve nested/structured futures
# from prefect.futures import resolve_futures_to_results
# results = resolve_futures_to_results(futs)
return results
- Wait for many futures or process as they complete:
from prefect.futures import wait, as_completed
done, not_done = wait(futs)
all_results = [f.result() for f in done]
for f in as_completed(futs):
val = f.result()
...
Quick checklist
- Are you calling an async function directly in a sync flow? Make the flow async and await it, or wrap it as a task.
- Are you iterating a PrefectFuture or coroutine instead of its resolved result? Call .result()
or use resolve_futures_to_results(...)
.
- When mapping, remember task.map(...)
returns a list of futures; resolve them before iterating over the concrete values.
If you can share a minimal snippet of your flow/task and where you check the return result, I can point to the exact line that needs changing.Daniel Leicht
09/09/2025, 4:52 AM07:49:39.246 | INFO | Flow run 'ingenious-orangutan' - Finished in state Failed('Flow run encountered an exception: ValueError: Path C:\\root\\.prefect\\storage\\1d89e688de5d4f64b1bf0696b7dbc548 does not exist.')
Traceback (most recent call last):
File "C:\Projects\BullzAI\NewETLPilot\flows\silver.py", line 162, in <module>
bronze_to_silver()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flows.py", line 1699, in __call__
return run_flow(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 1530, in run_flow
ret_val = run_flow_sync(**kwargs)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 1375, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 351, in result
raise self._raised
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 765, in run_context
yield self
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 1373, in run_flow_sync
engine.call_flow_fn()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 785, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\utilities\callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Projects\BullzAI\NewETLPilot\flows\silver.py", line 153, in bronze_to_silver
flow_return_value = asyncio.run(get_state_result(state))
File "C:\Users\User\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\User\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete
return future.result()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\states.py", line 85, in get_state_result
return await _get_state_result(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\states.py", line 160, in _get_state_result
result = await _get_state_result_data_with_retries(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\states.py", line 117, in _get_state_result_data_with_retries
record = await ResultStore._from_metadata(state.data)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\results.py", line 371, in _from_metadata
result = await store.aread(metadata.storage_key)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\results.py", line 655, in aread
return await self._read(key=key, holder=holder, _sync=False)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\results.py", line 606, in _read
content = await call_explicitly_async_block_method(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\_internal\compatibility\blocks.py", line 21, in call_explicitly_async_block_method
return await getattr(block, method).aio(block, *args, **kwargs)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\filesystems.py", line 220, in read_path
raise ValueError(f"Path {path} does not exist.")
ValueError: Path C:\root\.prefect\storage\1d89e688de5d4f64b1bf0696b7dbc548 does not exist.
Process finished with exit code 1Marvin
09/09/2025, 4:52 AMMarvin
09/09/2025, 4:56 AMfrom prefect import flow
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer # or PickleSerializer
remote_store = RemoteFileSystem(
basepath="<s3://my-bucket/prefect-results>" # or "gs://...", "az://...", "<smb://server/share/prefect-results>"
# settings={"key": "...", "secret": "..."} # or rely on environment/instance creds
)
@flow(
persist_result=True,
result_storage=remote_store,
result_serializer=JSONSerializer() # optional; Pickle is fine for Python objects
)
def bronze_to_silver(...):
...
return some_value
Option 2: Make it your default (recommended if all flows should share the same store)
- Configure a default result storage and persistence in your Prefect profile so you don’t have to set it on every flow. See docs below for how to set this in your profile/settings.
Notes
- Re-run flows after you configure shared storage; existing runs point to the old local path and can’t be recovered from other hosts.
- For large/complex Python objects, consider your serializer and storage carefully. JSON is portable for simple types; Pickle can handle arbitrary objects but is Python-specific.
- Reading state results from another host will always use the configured result storage; in-memory caching only helps within the same process/machine.
References
- Results overview: Results (how persistence works)
- API: prefect.results
- Filesystems (RemoteFileSystem, SMB, etc.): prefect.filesystems
If you share:
- which backend you prefer (S3/GCS/Azure/SMB),
- whether you want per-flow config or a profile-wide default,
I can give you a concrete snippet and any credential setup tips for your environment.