<@ULVA73B9P> can you write snippet retry mechanism...
# ask-marvin
s
@Marvin can you write snippet retry mechanism on flow
Copy code
#  utils/prefect/flows/example_flow.py

import random

from prefect import flow
from loguru import logger

from utils.prefect.prefect_helpers import flow_run_name, add_loguru_prefect_handler

@flow(name="Example Flow", flow_run_name=flow_run_name, retries=3, retry_delay_seconds=60)
def example_flow(retries: int):
    add_loguru_prefect_handler()
    <http://logger.info|logger.info>("Starting flow 1")

    if random.choice([1, 2, 3]):  # Will randomly pick 1, 2, or 3
        logger.error(f"Got number {random.choice([1, 2, 3])}! Will retry...")
        raise Exception("Random failure to trigger retry mechanism")

    <http://logger.info|logger.info>("Flow completed successfully!")
    return {"status": "completed", "flow": "flow1"}
is it right?
m
Your snippet is almost correct, but there's a small detail that needs adjustment. In Prefect 3.x, the parameters
retries
and
retry_delay_seconds
are indeed part of the flow function decorators and can be used to handle retry logic. Here's a revised version of your code that ensures the retry mechanism is properly implemented:
Copy code
python
from prefect import flow
import random
from loguru import logger
from utils.prefect.prefect_helpers import flow_run_name, add_loguru_prefect_handler

@flow(name="Example Flow", flow_run_name=flow_run_name, retries=3, retry_delay_seconds=60)
def example_flow():
    add_loguru_prefect_handler()
    <http://logger.info|logger.info>("Starting flow 1")
    
    if random.choice([1, 2, 3]) != 2:  # Ensure randomness raises an exception most of the time
        logger.error("Random failure to trigger retry mechanism")
        raise Exception("Random failure to trigger retry mechanism")
    
    <http://logger.info|logger.info>("Flow completed successfully!")
    return {"status": "completed", "flow": "flow1"}
In this snippet: - The
retries=3
decorator ensures the flow will retry up to 3 times. - The
retry_delay_seconds=60
sets a delay of 60 seconds between retries. - I fixed the random choice logic to ensure that it raises an exception in most cases to trigger retries. You can customize and test it further as needed. For more detailed information, including examples and options for retry strategies, you can refer to the Prefect 3.x documentation on retries.
s
@Marvin its just crashed and not retry
Copy code
Completed submission of flow run '0d97b64b-7461-45c3-8154-f78b06c6fb87'
01:30:49 PM
prefect.flow_runs.worker
Process 1648 exited with status code: 1
01:30:53 PM
prefect.flow_runs.worker
Reported flow run '0d97b64b-7461-45c3-8154-f78b06c6fb87' as crashed: Flow run infrastructure exited with non-zero status code 1.
maybe retry is possible only for tasks?
m
In Prefect, the retry mechanism is typically applied to tasks, not flows. If your flow isn't retrying as expected, it's likely because the retry settings are meant for tasks within the flow rather than the flow itself. Here's a quick snippet illustrating how to add retries to a task within a flow:
Copy code
python
from prefect import flow, task
import random
from loguru import logger
from utils.prefect.prefect_helpers import flow_run_name, add_loguru_prefect_handler

@task(retries=3, retry_delay_seconds=60)
def random_failure_task():
    <http://logger.info|logger.info>("Executing task...")
    if random.choice([1, 2, 3]) != 2:  # An arbitrary condition for failure
        logger.error("Random failure to trigger retry mechanism")
        raise Exception("Random failure to trigger retry mechanism")
    <http://logger.info|logger.info>("Task completed successfully!")

@flow(name="Example Flow", flow_run_name=flow_run_name)
def example_flow():
    add_loguru_prefect_handler()
    <http://logger.info|logger.info>("Starting flow 1")
    random_failure_task()  # Make sure to call your task inside the flow
    <http://logger.info|logger.info>("Flow completed successfully!")
    return {"status": "completed", "flow": "flow1"}
In this example: - The
random_failure_task
has the
retries
and
retry_delay_seconds
parameters set, which will ensure the task retries up to 3 times with a delay of 60 seconds between attempts. Retry mechanisms applied to tasks can handle intermittent failures without manually restarting entire flow runs. For more details and options, you can check the Prefect 3.x task retry documentation. I hope this helps, but if there's anything else that might give your flow a more optimistic attitude toward retries, feel free to ask!