https://prefect.io logo
Title
m

Matt Kizaric

05/04/2023, 3:32 PM
Whats the best way to handle graceful shutdown of a flow? I am using the parent-child deployed flow pattern but noticed that some of my flows are still running if the parent is canceled/fails. I want to catch any termination signals and ensure that those child flows also get canceled. Any advice?
1
b

Bianca Hoch

05/04/2023, 4:47 PM
Hey Matt, first thing that comes to mind is creating an automation to cancel the child flow runs in response to the parent entering a state of
Failed
or
Canceled
.
m

Matt Kizaric

05/04/2023, 4:49 PM
hmm unfortunately we are self-hosted and don't have access to automations currently
b

Bianca Hoch

05/04/2023, 5:30 PM
Hmm..you may be able to leverage something like a flow-level state handler then. Something like this:
import prefect
from prefect import task, flow
from prefect.blocks.notifications import SlackWebhook
from prefect.orion.schemas.states import StateType
import time

#Load your desired block, this example uses a slack notification block.
def notify_function():
    slack_webhook_block = SlackWebhook.load("my-test-notification")
    #This is the message you want to send to slack
    slack_webhook_block.notify("Hello from Prefect!")

#A simple flow
@flow
def my_flow():
    time.sleep(10)
    #pass
    #raise Exception

if __name__ == "__main__":
    #Store the state of your flow
    state = my_flow(return_state=True)
    '''This if statement checks for a specific flow state.
    If the condition is satisfied, the block is invoked.
    State type can be set to any available state type: <https://docs.prefect.io/concepts/states/#state-types''>'
    if state.type == (StateType.COMPLETED):
       notify_function()
But instead of calling a notification block like I did, you can cancel the child flow runs using set_flow_run_state
Here's an example of a function that uses
set_flow_run_state
import asyncio
from prefect.client import get_client
from prefect.orion.schemas.states import Cancelled


async def main():
    async with get_client() as client:
        id = "2331e33e-13aa-4415-a6e5-e6ea86c426a9"
        response = await client.set_flow_run_state(flow_run_id=id, state=Cancelled())
        print(response)


if __name__ == "__main__":
    asyncio.run(main())
s

Serina

05/04/2023, 8:59 PM
We also support an
on_failure
(and
on_crashed
) state change hook that allows you to execute arbitrary Python code in response to the above terminal states. Example usage can be found here: https://github.com/PrefectHQ/prefect/blob/main/RELEASE-NOTES.md#on_completion-and-on_failure-hooks-for-flows-and-tasks
1
We plan to support an
on_cancelled
flow run state change hook soon, and its progress can be tracked here: https://github.com/PrefectHQ/prefect/issues/8446