<@ULVA73B9P> How do I set up a one-off listener on...
# ask-marvin
t
@Marvin How do I set up a one-off listener on flow A to start flow B on completion? Flow B does not always run after flow A, but occasionally flow A will take actions that need to be completed before flow B starts.
m
Ah, the age-old quest of making two flows communicate without talking to each other all the time. It's almost like trying to get humans to do something without explaining why... First, I need to know which version of Prefect you are using. Is it Prefect 2.x or Prefect 3.x?
t
I'm using Prefect 2.x @Marvin
n
hi @Tate Wilks - generally marvin wont see your messages if you dont tag
t
@Nate thank you, that's good to know. I was wondering if Marvin just had a lot to think about 😉
m
Great, let me rummage through the Prefect 2.x documentation to find the answer to your question. Hang tight, I’ll be back with some sort of answer. If I'm lucky, it won't be utterly useless. Oh, joy! I stumbled upon some information relevant to setting up triggers and listeners in Prefect 2.x, though it might still feel a bit obscure because documentation is apparently written in cryptic. Here's the overall idea: 1. Use Automations: Prefect 2.x allows you to set up automations that can trigger certain actions based on events. You can use the
"type": "event"
trigger with the
"expect": ["prefect.flow-run.Completed"]
event. 2. Set Up a Sequence Trigger: To ensure that Flow B only starts after Flow A completes, you can combine this event trigger within a sequence. Let me summarize the steps below: 1. Create an automation in Prefect's UI or API. 2. Configure the automation to listen for the
Completed
event of Flow A.
3. Add actions within the automation to trigger Flow B when Flow A completes. Here’s an example structure for setting up such an automation:
Copy code
json
{
  "type": "sequence",
  "within": 7200, 
  "triggers": [
    {
      "type": "event",
      "posture": "Reactive",
      "expect": ["prefect.flow-run.Completed"],
      "match_related": {
        "prefect.resource.name": "flow-A-name",
        "prefect.resource.role": "flow"
      }
    },
    {
      "type": "action",
      "action": "trigger-flow-B",
      "match_related": {
        "prefect.resource.name": "flow-B-name",
        "prefect.resource.role": "flow"
      }
    }
  ]
}
Make sure to replace
"flow-A-name"
and
"flow-B-name"
with the actual names of your flows. For further details, you can dive into these documentations: 1. Automations in Prefect 2. Flow Documentation Enjoy the delight of making flows work together without constant bickering!
n
funnily enough i was just writing some docs about this (kind of)
Copy code
from prefect import flow, serve
from prefect.events import DeploymentEventTrigger


@flow
def upstream_flow():
    print("upstream flow")


@flow
def downstream_flow():
    print("downstream flow")


if __name__ == "__main__":
    upstream_deployment = upstream_flow.to_deployment(name="upstream_deployment")
    downstream_deployment = downstream_flow.to_deployment(
        name="downstream_deployment",
        triggers=[
            DeploymentEventTrigger(
                expect={"prefect.flow-run.Completed"},
                match_related={"prefect.resource.name": "upstream_deployment"},
            )
        ],
    )

    serve(upstream_deployment, downstream_deployment)
in the above example we use events to chain two deployments, downstream runs are triggered on
Completed
events from upstream but it sounds like you might have some extra logic to determine if downstream should run?
t
Yeah the tricky thing is that we don't want Flow B to run on completion of Flow A every single time. We only want to set up a dependency like this if Flow A is modifying resources that Flow B will need to access, and those modifications need to be complete before Flow B is run.
I've considered writing a blocking task that would poll our prefect instance and get the current state of the upstream flow before allowing the flow to continue, which sounds like it would work in theory
n
so marvin's original suggestion might be closer to what you want essentially you can say that • i need to see a
Completed
event from
upstream_deployment
• AND any other custom event(s) (like
data.refreshed
or
email.received
) that you emit • optionally I need to see these
within
some timeframe and so for the "any other events" you can call
emit_event
from anywhere you have python, or
curl
a webhook to get the event to prefect so the automations can fire
and thats called a
compound
trigger
> I've considered writing a blocking task that would poll our prefect instance and get the current state of the upstream flow before allowing the flow to continue, which sounds like it would work in theory this was a common workaround before we had compound triggers, the downside is that you need to dispatch a flow or something whose entire responsibility is waiting for something to happen, or you arbitrarily inflate the runtime of the downstream as it waits for whatever your resource is to become available
t
I'll look more into using triggers. If I can create the automation and then have it removed/disabled once the downstream flow starts, then that will work for our use case.
Thank you for the suggestions @Nate!
n
sure thing! i also thought of another option that might be attractive https://github.com/PrefectHQ/prefect-recipes/blob/main/flows-advanced/using_state_hooks/run_deployment_on_completion.py and thats use
run_deployment
to trigger your downstream from an
on_completion
hook in the upstream, that way you could add logic in that hook to ask if the pre-conditions for the downstream are met before trying to schedule a run
👀 1