Igor Kaluder
05/20/2024, 8:40 AMMarvin
05/20/2024, 8:41 AMpython
import httpx
from prefect import flow
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
url = f"<https://api.github.com/repos/{repo_name}>"
response = httpx.get(url)
response.raise_for_status()
repo = response.json()
print(f"{repo}")
In the example above, the retries=3
parameter indicates that the flow should be retried up to 3 times if an error occurs during execution, and retry_delay_seconds=5
specifies a delay of 5 seconds between each retry.
If you need to rerun a flow run that has already failed, you can do so through the Prefect UI or by using the Prefect Client in Python to programmatically create a new flow run. Here's a function from the Prefect documentation that you might use to reschedule failed flow runs:
python
from prefect import get_client
from prefect.client import FlowRunFilter, DeploymentFilter
from prefect.orion.schemas.filters import FlowRunFilter, DeploymentFilter
from prefect.orion.schemas.sorting import FlowRunSort
from datetime import datetime, timezone, timedelta
from prefect.engine.state import Scheduled
async def reschedule_late_flow_runs(deployment_name: str, delay: timedelta, most_recent_n: int, delete_remaining: bool = True, states: Optional[list[str]] = None) -> list[FlowRun]:
if not states:
states = ["Late"]
async with get_client() as client:
flow_runs = await client.read_flow_runs(
# Filtering criteria here to identify the failed flow runs
)
# The rest of the function logic for rescheduling flows goes here
In this script, you would identify the failed flow runs using appropriate filter criteria and then programmatically reschedule them as needed.
For any further details or handling specific cases, you can refer to the Prefect documentation or reach out to the Prefect community via their official channels.
Let me know if you need help assembling a more specific script or if you have any other questions concerning retries or handling flow run failures.