<@ULVA73B9P> How can I get the subflow return valu...
# ask-marvin
d
@Marvin How can I get the subflow return value in the bellow example?
Copy code
state = asyncio.run(poll_flow_state(flow_run))
if state.is_completed() or state.is_failed():
    <http://logger.info|logger.info>(f"Flow {flow_run.name} finished with state: {state.type}")
else:
    still_running.append(flow_run)
m
thought for 118.3 seconds
Use
prefect.states.get_state_result(state)
to resolve the subflow’s return value from the
State
you polled. Example adapting your snippet:
Copy code
from prefect.states import get_state_result, get_state_exception

state = asyncio.run(poll_flow_state(flow_run))

if state.is_completed():
    value = get_state_result(state)  # resolves and returns the subflow's return value
    <http://logger.info|logger.info>(f"Subflow {flow_run.name} returned: {value!r}")

elif state.is_failed():
    # get_state_result raises by default on failure
    try:
        get_state_result(state)  # raises the original exception
    except Exception as exc:
        logger.exception(f"Subflow {flow_run.name} failed", exc_info=exc)

else:
    still_running.append(flow_run)
Notes: -
get_state_result
raises on FAILED/CRASHED/CANCELLED by default. If you prefer not to raise, call
get_state_result(state, raise_on_failure=False)
and/or inspect
get_state_exception(state)
for details. - If you’re calling a subflow from inside another flow, you can usually just call the subflow function directly and use its return value without polling. Refs: see
get_state_result
and related helpers in prefect.states helpers. Quick question to ensure we point you to the best approach: are you polling these subflows from outside of a Prefect flow (e.g., a controller script), or from inside a parent flow?
d
@Marvin
Copy code
TypeError: 'coroutine' object is not iterable
When checking the return result
m
thought for 155.6 seconds
That error usually means something in your flow/task is returning a coroutine and you’re trying to iterate over it (or treat it like a list/dict) without awaiting or resolving it. In Prefect 3.x: - Calling a task returns a PrefectFuture, not the actual value. - To get the value: - In any flow: call
future.result()
- For collections/nested structures of futures: use
prefect.futures.resolve_futures_to_results(...)
- To manage many futures: use
prefect.futures.wait(...)
or
as_completed(...)
Docs and API - Prefect Futures overview: prefect.futures docs - Source for futures helpers: prefect.futures.py Common causes of "coroutine object is not iterable" in flows - Calling a bare async function (not a task) inside a sync flow and then iterating it: - Problem:
Copy code
@flow
    def main():
        data = get_rows()  # get_rows is async def
        for r in data:     # TypeError: 'coroutine' object is not iterable
            ...
- Fix 1 (await it in an async flow):
Copy code
@flow
    async def main():
        data = await get_rows()
        for r in data:
            ...
- Fix 2 (wrap it as a Prefect task and resolve the future):
Copy code
from prefect import flow, task

    @task
    async def get_rows():
        ...

    @flow
    def main():
        rows_fut = get_rows()        # PrefectFuture
        rows = rows_fut.result()     # concrete list
        for r in rows:
            ...
- Iterating over a task call directly instead of its resolved result: - Problem:
Copy code
items = produce_items()  # returns PrefectFuture
    for x in items:          # TypeError: 'PrefectFuture' is not iterable (or similar)
        ...
- Fix:
Copy code
items = produce_items().result()
    for x in items:
        ...
- Returning a coroutine from a task/flow by accident (e.g., returning an un-awaited async call) and consuming it upstream. Working with lists of tasks - Map across data:
Copy code
from prefect import flow, task

  @task
  def double(x): return x * 2

  @flow
  def main():
      futs = double.map(range(5))     # PrefectFutureList
      # Option A: resolve all results
      results = [f.result() for f in futs]
      # Option B: resolve nested/structured futures
      # from prefect.futures import resolve_futures_to_results
      # results = resolve_futures_to_results(futs)
      return results
- Wait for many futures or process as they complete:
Copy code
from prefect.futures import wait, as_completed

  done, not_done = wait(futs)
  all_results = [f.result() for f in done]

  for f in as_completed(futs):
      val = f.result()
      ...
Quick checklist - Are you calling an async function directly in a sync flow? Make the flow async and await it, or wrap it as a task. - Are you iterating a PrefectFuture or coroutine instead of its resolved result? Call
.result()
or use
resolve_futures_to_results(...)
. - When mapping, remember
task.map(...)
returns a list of futures; resolve them before iterating over the concrete values. If you can share a minimal snippet of your flow/task and where you check the return result, I can point to the exact line that needs changing.
d
@Marvin Does setting persist_result to True save the result locally? When trying to access the result of a flow from a worker that run on another host I get the bellow:
07:49:39.246 | INFO    | Flow run 'ingenious-orangutan' - Finished in state Failed('Flow run encountered an exception: ValueError: Path C:\\root\\.prefect\\storage\\1d89e688de5d4f64b1bf0696b7dbc548 does not exist.')
Traceback (most recent call last):
File "C:\Projects\BullzAI\NewETLPilot\flows\silver.py", line 162, in <module>
bronze_to_silver()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flows.py", line 1699, in __call__
return run_flow(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 1530, in run_flow
ret_val = run_flow_sync(**kwargs)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 1375, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 351, in result
raise self._raised
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 765, in run_context
yield self
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 1373, in run_flow_sync
engine.call_flow_fn()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\flow_engine.py", line 785, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\utilities\callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Projects\BullzAI\NewETLPilot\flows\silver.py", line 153, in bronze_to_silver
flow_return_value = asyncio.run(get_state_result(state))
File "C:\Users\User\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\User\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete
return future.result()
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\states.py", line 85, in get_state_result
return await _get_state_result(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\states.py", line 160, in _get_state_result
result = await _get_state_result_data_with_retries(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\states.py", line 117, in _get_state_result_data_with_retries
record = await ResultStore._from_metadata(state.data)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\results.py", line 371, in _from_metadata
result = await store.aread(metadata.storage_key)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\results.py", line 655, in aread
return await self._read(key=key, holder=holder, _sync=False)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\results.py", line 606, in _read
content = await call_explicitly_async_block_method(
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\_internal\compatibility\blocks.py", line 21, in call_explicitly_async_block_method
return await getattr(block, method).aio(block, *args, **kwargs)
File "C:\Projects\BullzAI\NewETLPilot\venv\lib\site-packages\prefect\filesystems.py", line 220, in read_path
raise ValueError(f"Path {path} does not exist.")
ValueError: Path C:\root\.prefect\storage\1d89e688de5d4f64b1bf0696b7dbc548 does not exist.
Process finished with exit code 1
m
thought for 222.6 seconds
Short answer: persist_result=True persists to the configured result storage. By default that storage is the local filesystem on the machine that ran the flow/task (e.g., C:\root\.prefect\storage on Windows). It is not shared across hosts. That’s why another worker/host can’t read the result and you see “Path ... does not exist.” What’s happening - Your flow wrote its return value to the default local result store on Host A. - Later, another host (Host B) tries to resolve the state’s result; Prefect looks at the stored metadata and attempts to read C:\root\.prefect\storage\<key> on Host B. - That path/file doesn’t exist on Host B, so result resolution fails. How to fix it Pick a shared/remote result store and configure your flows/tasks to use it so any host can read results. Option 1: Set per-flow
Copy code
from prefect import flow
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer  # or PickleSerializer

remote_store = RemoteFileSystem(
    basepath="<s3://my-bucket/prefect-results>"  # or "gs://...", "az://...", "<smb://server/share/prefect-results>"
    # settings={"key": "...", "secret": "..."}  # or rely on environment/instance creds
)

@flow(
    persist_result=True,
    result_storage=remote_store,
    result_serializer=JSONSerializer()  # optional; Pickle is fine for Python objects
)
def bronze_to_silver(...):
    ...
    return some_value
Option 2: Make it your default (recommended if all flows should share the same store) - Configure a default result storage and persistence in your Prefect profile so you don’t have to set it on every flow. See docs below for how to set this in your profile/settings. Notes - Re-run flows after you configure shared storage; existing runs point to the old local path and can’t be recovered from other hosts. - For large/complex Python objects, consider your serializer and storage carefully. JSON is portable for simple types; Pickle can handle arbitrary objects but is Python-specific. - Reading state results from another host will always use the configured result storage; in-memory caching only helps within the same process/machine. References - Results overview: Results (how persistence works) - API: prefect.results - Filesystems (RemoteFileSystem, SMB, etc.): prefect.filesystems If you share: - which backend you prefer (S3/GCS/Azure/SMB), - whether you want per-flow config or a profile-wide default, I can give you a concrete snippet and any credential setup tips for your environment.