<@ULVA73B9P> Using a Prefect 3 self hosted server,...
# ask-marvin
n
@Marvin Using a Prefect 3 self hosted server, is it possible to create an automation that triggers a flow when another flow is completed? The triggered flow needs also to access the result returned by the completed flow
m
thought for 248.4 seconds
Short answer: yes. In Prefect 3 (self‑hosted or Cloud), you can create an Automation that triggers when a flow run completes and starts another deployment. However, the Automation cannot directly “pipe” the upstream flow’s return value into the downstream parameters. The usual pattern is to pass the upstream flow run’s ID (or a pointer) via the Automation and have the downstream flow fetch the data. Two common ways to share the result: 1) Pass the upstream flow run ID to the downstream flow, then fetch the result - In your Automation: - Trigger: When a flow run for “Upstream” enters the Completed state - Action: Run Deployment “Downstream” - Parameters: add something like -
upstream_flow_run_id: "{{ event.resource.id }}"
- In your downstream flow, fetch the upstream result:
Copy code
from prefect import flow
from prefect.client.orchestration import get_client

@flow
async def downstream(upstream_flow_run_id: str):
    async with get_client() as client:
        # Read the final state of the upstream flow run
        state = await client.read_flow_run_state(upstream_flow_run_id)
        # Materialize the result (works if the upstream result is persisted/serializable)
        result = await state.result()
        # Do something with `result`
        print("Upstream result:", result)
Notes: - You must ensure the upstream flow’s return value is persistable (e.g., JSON‑serializable or result persistence configured) so it can be materialized later. - If the result is large/complex, consider storing it outside Prefect (e.g., S3/DB) and pass a pointer instead. 2) Store the upstream result under a unique key (Variables/external store), pass the key - In the upstream flow, create a unique key using the flow run ID and store the result:
Copy code
from prefect import flow
from prefect.runtime import flow_run
from prefect.variables import Variable

@flow
def upstream():
    rid = flow_run.id
    result = {"foo": "bar"}  # JSON-serializable
    Variable.set(f"result-{rid}", result)
    return result
- In the Automation, pass the key to the downstream deployment: -
upstream_key: "result-{{ event.resource.id }}"
- In the downstream flow, retrieve it:
Copy code
from prefect import flow
from prefect.variables import Variable

@flow
def downstream(upstream_key: str):
    result = Variable.get(upstream_key)
    print("Upstream result:", result)
This avoids relying on state result persistence and works well for small/medium JSON payloads. For large data, store externally and pass a pointer via the same pattern. Helpful docs and references: - Automations (event triggers and actions): Event triggers (Automations) - Creating a “Run deployment” trigger with templates: Create Automation to run a deployment - Templating/parameters in automation actions: Use templated parameters in Automation actions - Variables: Prefect Variables - Reading flow run state/result via API: Read flow run state (API) If you expect to move data frequently between flows, another robust option is to implement an “orchestrator” flow that calls downstream deployments and awaits their results programmatically instead of relying entirely on Automations. Quick question to tailor the recommendation: roughly how big is the result you need to pass (a small JSON vs large data), and are you okay persisting it via Variables or an external store (e.g., S3/DB)?