<@ULVA73B9P> How do I get the Flow Run ID from the...
# ask-marvin
m
@Marvin How do I get the Flow Run ID from the parent flow from within a function passed to
on_crashed
?
m
To get the Flow Run ID from the parent flow within a function passed to
on_crashed
, you would need to have access to the context of the parent flow. However, the
on_crashed
hook is called when your flow crashes, and it might not have direct access to the parent flow's context. In Prefect, a subflow run is created when a flow function is called inside the execution of another flow. The primary flow is the "parent" flow. The flow created within the parent is the "child" flow or "subflow." Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately. You might need to pass the Flow Run ID of the parent flow as an argument to the child flow or store it in a place where it can be accessed by the
on_crashed
function. Please note that this is a general approach and might need to be adjusted based on your specific use case and Prefect configuration. For more detailed information, you might want to check the Prefect documentation or community resources: - Prefect Concepts: Flows - Prefect Release Notes on GitHub
m
@Marvin I wanna make a conditional Retry that restarts only when I hit an AWS Throttling error.
m
In Prefect, you can handle retries in a few ways. One way is to use the
max_retries
and
retry_delay
parameters when defining your task. This will automatically retry your task when an exception is raised. However, if you want to retry only for specific exceptions (like an AWS Throttling error), you can use a try/except block within your task function and raise a
prefect.engine.signals.RETRY
signal when you encounter the specific exception. Here's an example:
Copy code
from prefect import task, Flow
from prefect.engine import signals

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=1))
def my_task():
    try:
        # Your task code here
        pass
    except AWSThrottlingException:
        raise signals.RETRY("Hit an AWS throttling error, retrying...")
In this example,
AWSThrottlingException
would be whatever exception is raised when you hit an AWS throttling error. The task will be retried 3 times with a delay of 1 minute between each retry. For more advanced retry logic, you can also use a custom retry function. This function should accept the number of retries as an argument and return a list of retry delays. Prefect includes an
exponential_backoff
utility that will automatically generate a list of retry delays that correspond to an exponential backoff retry strategy. For more information, check the Prefect documentation on tasks.
n
will this be helpful when it comes through?
👍 2
m
Perfect! This was my original Prefect 1 state handler, looks like that'll have like the exact same interface lol
Copy code
def make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
    def fn(task, old_state, new_state):
        if new_state.is_failed():
            <http://logger.info|logger.info>("FAILED")
            failure_message = new_state.message
            <http://logger.info|logger.info>("TESTING STATE HANDLER")
            <http://logger.info|logger.info>(failure_message)
            if no_restart_condition(failure_message):
                <http://logger.info|logger.info>("END RUN!")
                raise ENDRUN(new_state)
            else:
                <http://logger.info|logger.info>("RESTARTING!")
        return new_state

    return fn