Sergei
12/14/2024, 4:25 PM# utils/prefect/flows/example_flow.py
import random
from prefect import flow
from loguru import logger
from utils.prefect.prefect_helpers import flow_run_name, add_loguru_prefect_handler
@flow(name="Example Flow", flow_run_name=flow_run_name, retries=3, retry_delay_seconds=60)
def example_flow(retries: int):
add_loguru_prefect_handler()
<http://logger.info|logger.info>("Starting flow 1")
if random.choice([1, 2, 3]): # Will randomly pick 1, 2, or 3
logger.error(f"Got number {random.choice([1, 2, 3])}! Will retry...")
raise Exception("Random failure to trigger retry mechanism")
<http://logger.info|logger.info>("Flow completed successfully!")
return {"status": "completed", "flow": "flow1"}
is it right?Marvin
12/14/2024, 4:25 PMretries
and retry_delay_seconds
are indeed part of the flow function decorators and can be used to handle retry logic.
Here's a revised version of your code that ensures the retry mechanism is properly implemented:
python
from prefect import flow
import random
from loguru import logger
from utils.prefect.prefect_helpers import flow_run_name, add_loguru_prefect_handler
@flow(name="Example Flow", flow_run_name=flow_run_name, retries=3, retry_delay_seconds=60)
def example_flow():
add_loguru_prefect_handler()
<http://logger.info|logger.info>("Starting flow 1")
if random.choice([1, 2, 3]) != 2: # Ensure randomness raises an exception most of the time
logger.error("Random failure to trigger retry mechanism")
raise Exception("Random failure to trigger retry mechanism")
<http://logger.info|logger.info>("Flow completed successfully!")
return {"status": "completed", "flow": "flow1"}
In this snippet:
- The retries=3
decorator ensures the flow will retry up to 3 times.
- The retry_delay_seconds=60
sets a delay of 60 seconds between retries.
- I fixed the random choice logic to ensure that it raises an exception in most cases to trigger retries.
You can customize and test it further as needed. For more detailed information, including examples and options for retry strategies, you can refer to the Prefect 3.x documentation on retries.Sergei
12/14/2024, 4:32 PMCompleted submission of flow run '0d97b64b-7461-45c3-8154-f78b06c6fb87'
01:30:49 PM
prefect.flow_runs.worker
Process 1648 exited with status code: 1
01:30:53 PM
prefect.flow_runs.worker
Reported flow run '0d97b64b-7461-45c3-8154-f78b06c6fb87' as crashed: Flow run infrastructure exited with non-zero status code 1.
maybe retry is possible only for tasks?Marvin
12/14/2024, 4:32 PMpython
from prefect import flow, task
import random
from loguru import logger
from utils.prefect.prefect_helpers import flow_run_name, add_loguru_prefect_handler
@task(retries=3, retry_delay_seconds=60)
def random_failure_task():
<http://logger.info|logger.info>("Executing task...")
if random.choice([1, 2, 3]) != 2: # An arbitrary condition for failure
logger.error("Random failure to trigger retry mechanism")
raise Exception("Random failure to trigger retry mechanism")
<http://logger.info|logger.info>("Task completed successfully!")
@flow(name="Example Flow", flow_run_name=flow_run_name)
def example_flow():
add_loguru_prefect_handler()
<http://logger.info|logger.info>("Starting flow 1")
random_failure_task() # Make sure to call your task inside the flow
<http://logger.info|logger.info>("Flow completed successfully!")
return {"status": "completed", "flow": "flow1"}
In this example:
- The random_failure_task
has the retries
and retry_delay_seconds
parameters set, which will ensure the task retries up to 3 times with a delay of 60 seconds between attempts.
Retry mechanisms applied to tasks can handle intermittent failures without manually restarting entire flow runs. For more details and options, you can check the Prefect 3.x task retry documentation.
I hope this helps, but if there's anything else that might give your flow a more optimistic attitude toward retries, feel free to ask!