<@ULVA73B9P> how can i trigger a flow when another...
# ask-community
t
@Marvin how can i trigger a flow when another flow enters completed - both flows will have the same parameters
m
To trigger a flow when another flow completes, and both flows share the same parameters, you can use events and custom triggers in Prefect. 1. For Prefect 2.x: You can utilize event triggers to achieve this. Here's a brief overview: - Use an "event" trigger that listens for a
prefect.flow-run.Completed
event from the first flow. - Configure a "sequence" trigger if you need a series of triggers, where the completion of the first flow triggers the start of watching for subsequent events. More detailed guidance can be found in the Prefect 2.x documentation on automations. 2. For Prefect 3.x: Prefect 3.x provides enhanced support for custom triggers. You can: - Leverage a "compound" or "sequence" trigger that reacts to the completion of the first flow. - Specify parameter values for new flow runs, ensuring they match the first flow's parameters, when triggering the next flow. You can explore more about this setup in the Prefect 3.x documentation on custom triggers and writing flows. By configuring such triggers, you can achieve automated flow chaining based on the completion events, ensuring parameters align across both flows. It’s like a relay race where your baton (parameters) safely gets passed along from one smart runner to another, ensuring a seamless handoff!
b
I have been able to connect the run of a flow to the completion of another with some deployment code below. FWIW, I am using Cloud on a free tier and deploy my flows via a separate python script, shown below:
Copy code
from prefect import flow
from prefect.events import DeploymentEventTrigger

if __name__ == "__main__":
    flow.from_source(
        source="my-github-project.git",
        entrypoint="ml/pipeline/flows/fit-model.py:training_flow",
    ).deploy(
        name="mlops-train-model",
        work_pool_name="my-pool1",
        job_variables={"env": {"ENV1": "loves-to-code"},
                       "pip_packages": ["pandas", "requests"]},
        tags=["prod"],
        description="Pipeline to train a model and log metrics and parameters for a training job",
        version="1.0.0",
        triggers=[
            DeploymentEventTrigger(
                expect={"prefect.flow-run.Completed"},
                match_related={"prefect.resource.name": "ml-datasets"}
            )
        ]
    )
The key part above is the
DeploymentEventTrigger
which looks for the completed state on a previous flow, called
ml-datasets
above
t
@Brock thanks for this 🙂 Unless I'm being silly I don't think this solution will work for me. My Problem is this (in some sudo code) I have two flows:
def flow_a(id:int)
def flow_b(id:int)
when
flow_a
enters state
Completed
i want to trigger
flow_b
using the same
id:int
so the triggering of
flow_b
based on
flow_a
completing is fairly easy to do - but dynamically passing the arguments in the automation is tricky
b
You are right, I misunderstood your need and above won't work. My trigger is simple; when one completes, use that event to fire another run. I am the farthest thing from expert as I am still learning, but I constantly realize that this framework exposes all sorts of goodies for us, so I have to believe its possible. How is the is id being input for flow a? Depending on the source, I wonder if you can have that in your higher level logic as a variable that can be passed to either? Also, if both flows use the same input variable, stepping back is it not possible to have the logic from both flows under one "main" flow? Either a single flow, or use a flow to invoke other flows? This might allow you to have memory of the original input. I have done this outside of Prefect using AWS Step Functions.
upvote 1
126 Views