Victor Longui
08/24/2025, 10:57 PMMarvin
08/24/2025, 10:57 PMMarvin
08/24/2025, 10:59 PMimport asyncio
from datetime import datetime, timedelta, timezone
from prefect.client.orion import get_client
from prefect.orion.schemas.filters import (
FlowRunFilter,
FlowRunFilterStartTime,
StateFilter,
StateTypeFilter,
)
from prefect.orion.schemas.sorting import FlowRunSort
async def retry_failed_flow_runs(last_minutes: int = 5):
async with get_client() as client:
now = datetime.now(timezone.utc)
window_start = now - timedelta(minutes=last_minutes)
# Busca flow runs com state type FAILED ou CRASHED nos últimos X minutos
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
start_time=FlowRunFilterStartTime(after_=window_start),
state=StateFilter(
type=StateTypeFilter(any_=["FAILED", "CRASHED"])
),
),
sort=FlowRunSort.START_TIME_DESC,
)
print(f"Encontrados {len(flow_runs)} flow runs com falha no período.")
retried = 0
for fr in flow_runs:
# Evita subflows/flows ad-hoc sem deployment
if not fr.deployment_id:
# Sem deployment, não há como recriar um run automaticamente
continue
# Recria o run a partir do deployment, copiando parâmetros e tags
new_run = await client.create_flow_run_from_deployment(
deployment_id=fr.deployment_id,
parameters=getattr(fr, "parameters", None),
tags=list(getattr(fr, "tags", [])) or None,
)
print(f"Reagendado run {fr.id} -> novo run {new_run.id}")
retried += 1
print(f"Total reexecutado: {retried}")
if __name__ == "__main__":
asyncio.run(retry_failed_flow_runs(last_minutes=5))
Notas e boas práticas:
- Isso só funciona diretamente para runs que vieram de um deployment (deployment_id
presente). Para runs ad-hoc ou subflows, não há deployment para recriar; na prática, você precisaria iniciar manualmente uma nova execução do flow principal.
- Se quiser evitar reexecutar subflows, o filtro acima já ajuda porque geralmente subflows não têm deployment_id
. Ainda assim, se necessário, você pode adicionar lógica extra para checar e pular subflows.
- Para rodar periodicamente, você pode:
- Agendar esse script com um cron externo, ou
- Transformá-lo em um flow do Prefect e criar um deployment com um schedule a cada 5 minutos.
Referências (Prefect 2.x):
- Usando o Orion Client (consultas e criação de runs)
- API de filtros (FlowRunFilter, StateFilter, StateTypeFilter)
- create_flow_run_from_deployment
Se quiser, posso adaptar o exemplo para:
- copiar também o name
ou outras labels,
- aplicar filtros adicionais (tags específicas, apenas certos deployments),
- ou transformar tudo em um flow do Prefect com deployment e schedule.