<@ULVA73B9P> is it possible to pause a flow run an...
# marvin-ai
g
@Marvin is it possible to pause a flow run and wait for a custom event to resume it?
m
thought for 72.4 seconds
Yes, it's definitely possible to pause a flow run and wait for a custom event! Here's how you can do it in Prefect 3.x: 1. Basic Pausing:
Copy code
python
from prefect import flow, pause_flow_run

@flow
def my_flow():
    # Do some work
    pause_flow_run()  # Simple pause
    # Continue work after resume
2. Pausing with Custom Input:
Copy code
python
from prefect import flow, pause_flow_run
from typing import Dict

@flow
def my_flow():
    # Do some work
    result = pause_flow_run(
        wait_for_input=Dict,  # Specify the type of input you expect
        timeout=3600,  # Maximum time to wait (in seconds)
        key="my_custom_event"  # Optional key to identify this pause point
    )
    print(f"Received input: {result}")
    # Continue work with result
To resume the flow, you'll need the flow run ID and can use the
resume_flow_run
function:
Copy code
python
from prefect.flow_runs import resume_flow_run

# Resume with custom input
resume_flow_run(
    flow_run_id="your-flow-run-id",
    run_input={"your": "data"}
)
Key features: - `timeout`: Specifies how long to wait before timing out (default: 1 hour) - `poll_interval`: How often to check for resume signals (default: 10 seconds) - `key`: Optional identifier for the pause point - `wait_for_input`: The type of input to expect when resuming You can also resume flows through the Prefect UI by navigating to the paused flow run and providing the required input. This functionality is particularly useful for: - Human-in-the-loop workflows - Waiting for external events or approvals - Implementing checkpoints in long-running flows - Debugging or inspection of flow state during execution The flow will remain in a "Paused" state until either: 1. It's explicitly resumed via
resume_flow_run
2. The timeout period is reached 3. The flow run is cancelled Remember that while a flow is paused, it maintains its execution context, so variables and state are preserved when it resumes.
n
this is an interesting question and its certainly possible but its not immediately obvious what the "right" way is to make sure you have the
flow_run_id
on hand you want to resume but you could definitely have an event subscriber and call
resume_flow_run
from there when you see your event of interest
actually
there is a built-in resume flow run action
i forgot about that
so it would be a pretty run of the mill automation that you'd make
Nate [5:45 PM]
this is an interesting question and its certainly possible but its not immediately obvious what the "right" way is to make sure you have the
flow_run_id
on hand you want to resume
but you could definitely have an event subscriber and call
resume_flow_run
from there when you see your event of interest
[5:46 PM]
actually
[5:46 PM]
there is a built-in resume flow run action
[5:46 PM]
i forgot about that
[5:46 PM]
so it would be a pretty run of the mill automation that you'd make
@Marvin how would it look to construct an automation with the
Automation
object
m
thought for 52.1 seconds
Based on the research and available components, here's how you can create an automation to resume a flow run based on a custom event:
Copy code
python
from prefect.automations import (
    Automation,
    EventTrigger,  # For triggering on specific events
    ResumeFlowRun,  # Built-in action to resume flow runs
)

# Create an automation that resumes a flow run when a specific event occurs
automation = Automation(
    name="Resume Flow on Custom Event",
    description="Automatically resume a paused flow run when a specific event occurs",
    triggers=[
        EventTrigger(
            # Match flows in a paused state
            match={
                "flow_run.state.type.name": "PAUSED",
                # Add any additional matching criteria for your custom event
                "your.custom.event.field": "expected_value"
            }
        )
    ],
    actions=[
        ResumeFlowRun(  # Built-in action to resume the flow run
            # Optionally provide input data that will be passed to the resumed flow
            run_input={"your": "data"}
        )
    ]
)

# Save the automation
automation.save()
Key components of this setup: 1. Trigger: The
EventTrigger
defines when the automation should fire - You can match on flow run state (
PAUSED
) - Add additional matching criteria for your custom event - Can use multiple triggers if needed 2. Action: The
ResumeFlowRun
action is built-in and will handle resuming the flow - Can provide optional input data that will be passed to the flow when it resumes - The action automatically knows which flow run to resume based on the event context 3. Automation: Combines triggers and actions into a single unit - Gives it a name and description for identification - Can be enabled/disabled as needed - Can be saved and managed through the Prefect UI This approach has several advantages: - No need to manually track `flow_run_id`s - Built-in handling of resume logic - Can be managed through the Prefect UI - Can be combined with other triggers and actions for more complex scenarios You can also manage automations through the Prefect UI, where you can: - Monitor their status - Enable/disable them - View their execution history - Modify their configuration This provides a clean, declarative way to handle flow resumption based on custom events without needing to maintain separate event listeners or manually track flow run IDs.
g
I've been testing with automations and events but couldn't crack it yet. The use case would be to create a job in a different platform (runpod). That workload would
emit_event
or post webhook, to report when its done. Then that event would resume the flow. I've managed to emit the custom event, parse from the payload into an email notification. Which means I guess I should be able to pass data from the event back the flow. But not sure how to route things to resume the right flowrun id?
How does it infer the flow run id? Could it be with related resources?
by the way @Marvin wouldn't it be
Automation().create()
rather than
.save()
?
m
thought for 28.5 seconds
Yes, you're absolutely right! I apologize for the error in my previous response. The correct method is indeed
.create()
, not
.save()
. Here's the corrected version of that part of the code:
Copy code
python
# Create the automation
automation = Automation(
    name="Resume Flow on Custom Event",
    description="Automatically resume a paused flow run when a specific event occurs",
    triggers=[...],
    actions=[...]
).create()  # Use create() instead of save()
Thank you for the correction! This is more consistent with Prefect 3.x's API patterns.
n
let me take a look at this later when im not afk my prior is that if the triggering event has the flow run as a related resource, it could infer the ID, otherwise you might need to somehow pass it through the event which sounds not ideal if you're calling
emit_event
from somewhere 3rd party
g
That should be fine, we own the business logic in the 3rd party so we could always pass it in. Thougths would be welcome!