<@ULVA73B9P> Is it possible to retry a flow run th...
# ask-marvin
j
@Marvin Is it possible to retry a flow run that has crashed using the Prefect 3 SDK, CLI, or Cloud REST API? Ideally I would like to schedule it to run later.
m
Yes, it is possible to retry a flow run that has crashed using the Prefect 3.x SDK, CLI, or Cloud REST API. Here's a concise overview: 1. Prefect SDK: You can use the
flow.run()
method to retry a flow. If you want to schedule it to run later, you can use the
schedule
parameter to specify when it should execute. 2. Prefect CLI: While the CLI itself doesn't directly support scheduling retries, you can use it to trigger a flow run and manage scheduling through external cron jobs or task schedulers. 3. Prefect Cloud REST API: You can create a new flow run via the API and specify a scheduled start time. This involves making a POST request to the appropriate endpoint with your desired schedule. For more detailed instructions, you can refer to the Prefect documentation or explore the relevant source code on GitHub. If you have specific configurations or need further assistance with any of these methods, feel free to ask!
j
Using the SDK, can you give an example of how to get a flow run that has crashed and retry it, scheduling it to run in an hour from now?
n
hi @James Gatter - you have to tag marvin, but the easiest way should be like:
@Marvin Using the SDK, can you give an example of how to get a flow run that has crashed and retry it, scheduling it to run in an hour from now? maybe call the client method
create_flow_run_from_deployment
in an
on_crashed
hook
m
To retry a crashed flow run using the Prefect 3 SDK and schedule it to run one hour later, you can use the
create_flow_run_from_deployment
method within an
on_crashed
hook. Here's a concise example:
Copy code
python
from prefect import flow, task
from prefect.client.orchestration import get_client
from datetime import timedelta
from prefect.states import Crashed

@task
def example_task():
    raise Exception("This task will crash.")

@flow(on_crashed=lambda flow_run, _: retry_flow_run(flow_run.id))
def example_flow():
    example_task()

async def retry_flow_run(flow_run_id):
    async with get_client() as client:
        flow_run = await client.read_flow_run(flow_run_id)
        deployment_id = flow_run.deployment_id
        if deployment_id:
            # Schedule the retry to run in one hour
            await client.create_flow_run_from_deployment(
                deployment_id=deployment_id,
                parameters=flow_run.parameters,
                scheduled_start_time=flow_run.scheduled_start_time + timedelta(hours=1)
            )
        else:
            print("No deployment ID found for flow run.")

if __name__ == "__main__":
    example_flow()
### Key Points: -
on_crashed
Hook
: This hook is used to trigger actions when a flow crashes. -
create_flow_run_from_deployment
: This method is used to create a new flow run from an existing deployment, allowing you to specify parameters and a scheduled start time. Ensure you have the necessary deployment set up for the flow to use
create_flow_run_from_deployment
. For more details, you can refer to the Prefect documentation.
n
or actually run_deployment would be easier but same thing
j
ah for some reason I assumed the thread reply would have worked my bad!
Thanks I'll take a look Nate!
👍 1