Nicholas Pini
10/15/2025, 11:17 PMMarvin
10/15/2025, 11:17 PMMarvin
10/15/2025, 11:21 PMupstream_flow_run_id: "{{ event.resource.id }}"
- In your downstream flow, fetch the upstream result:
from prefect import flow
from prefect.client.orchestration import get_client
@flow
async def downstream(upstream_flow_run_id: str):
async with get_client() as client:
# Read the final state of the upstream flow run
state = await client.read_flow_run_state(upstream_flow_run_id)
# Materialize the result (works if the upstream result is persisted/serializable)
result = await state.result()
# Do something with `result`
print("Upstream result:", result)
Notes:
- You must ensure the upstream flow’s return value is persistable (e.g., JSON‑serializable or result persistence configured) so it can be materialized later.
- If the result is large/complex, consider storing it outside Prefect (e.g., S3/DB) and pass a pointer instead.
2) Store the upstream result under a unique key (Variables/external store), pass the key
- In the upstream flow, create a unique key using the flow run ID and store the result:
from prefect import flow
from prefect.runtime import flow_run
from prefect.variables import Variable
@flow
def upstream():
rid = flow_run.id
result = {"foo": "bar"} # JSON-serializable
Variable.set(f"result-{rid}", result)
return result
- In the Automation, pass the key to the downstream deployment:
- upstream_key: "result-{{ event.resource.id }}"
- In the downstream flow, retrieve it:
from prefect import flow
from prefect.variables import Variable
@flow
def downstream(upstream_key: str):
result = Variable.get(upstream_key)
print("Upstream result:", result)
This avoids relying on state result persistence and works well for small/medium JSON payloads. For large data, store externally and pass a pointer via the same pattern.
Helpful docs and references:
- Automations (event triggers and actions): Event triggers (Automations)
- Creating a “Run deployment” trigger with templates: Create Automation to run a deployment
- Templating/parameters in automation actions: Use templated parameters in Automation actions
- Variables: Prefect Variables
- Reading flow run state/result via API: Read flow run state (API)
If you expect to move data frequently between flows, another robust option is to implement an “orchestrator” flow that calls downstream deployments and awaits their results programmatically instead of relying entirely on Automations.
Quick question to tailor the recommendation: roughly how big is the result you need to pass (a small JSON vs large data), and are you okay persisting it via Variables or an external store (e.g., S3/DB)?