Pooyan
08/05/2024, 10:30 PMon_crashed
in prefect?Marvin
08/05/2024, 10:30 PMon_crashed
in Prefect. Here is an example from the documentation:
python
import os
from prefect import flow, task
from prefect.blocks.system import String
from prefect.client import get_client
import prefect.runtime
async def delete_cloud_run_job(flow, flow_run, state):
"""
Flow run state change hook that deletes a Cloud Run Job if the flow run crashes.
"""
cloud_run_job_name = await String.load(name="crashing-flow-cloud-run-job")
delete_job_command = f"yes | gcloud beta run jobs delete {cloud_run_job_name.value} --region us-central1"
os.system(delete_job_command)
async with get_client() as client:
block_document = await client.read_block_document(cloud_run_job_name.id)
# additional clean up actions
@flow(on_crashed=[delete_cloud_run_job])
def my_flow():
raise Exception("Crash me!")
For more information, visit the Prefect documentation on state-change hooks: https://docs.prefect.io/latest/guides/state-change-hooks/