Matt Alhonte
11/28/2023, 11:16 PMon_crashed
?Marvin
11/28/2023, 11:16 PMon_crashed
, you would need to have access to the context of the parent flow. However, the on_crashed
hook is called when your flow crashes, and it might not have direct access to the parent flow's context.
In Prefect, a subflow run is created when a flow function is called inside the execution of another flow. The primary flow is the "parent" flow. The flow created within the parent is the "child" flow or "subflow." Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately.
You might need to pass the Flow Run ID of the parent flow as an argument to the child flow or store it in a place where it can be accessed by the on_crashed
function. Please note that this is a general approach and might need to be adjusted based on your specific use case and Prefect configuration.
For more detailed information, you might want to check the Prefect documentation or community resources:
- Prefect Concepts: Flows
- Prefect Release Notes on GitHubMatt Alhonte
11/28/2023, 11:18 PMMarvin
11/28/2023, 11:18 PMmax_retries
and retry_delay
parameters when defining your task. This will automatically retry your task when an exception is raised.
However, if you want to retry only for specific exceptions (like an AWS Throttling error), you can use a try/except block within your task function and raise a prefect.engine.signals.RETRY
signal when you encounter the specific exception. Here's an example:
from prefect import task, Flow
from prefect.engine import signals
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=1))
def my_task():
try:
# Your task code here
pass
except AWSThrottlingException:
raise signals.RETRY("Hit an AWS throttling error, retrying...")
In this example, AWSThrottlingException
would be whatever exception is raised when you hit an AWS throttling error. The task will be retried 3 times with a delay of 1 minute between each retry.
For more advanced retry logic, you can also use a custom retry function. This function should accept the number of retries as an argument and return a list of retry delays. Prefect includes an exponential_backoff
utility that will automatically generate a list of retry delays that correspond to an exponential backoff retry strategy.
For more information, check the Prefect documentation on tasks.Nate
11/28/2023, 11:25 PMMatt Alhonte
11/29/2023, 2:01 AMdef make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
def fn(task, old_state, new_state):
if new_state.is_failed():
<http://logger.info|logger.info>("FAILED")
failure_message = new_state.message
<http://logger.info|logger.info>("TESTING STATE HANDLER")
<http://logger.info|logger.info>(failure_message)
if no_restart_condition(failure_message):
<http://logger.info|logger.info>("END RUN!")
raise ENDRUN(new_state)
else:
<http://logger.info|logger.info>("RESTARTING!")
return new_state
return fn