<@ULVA73B9P> Estou usando o Prefect 2.x. Eu quero...
# ask-marvin
v
@Marvin Estou usando o Prefect 2.x. Eu quero usar a api do Prefect para pegar as flows que rodaram nos ultimos 5 minutos, filtrar somente as que estao com status de crashed ou failed e dar retry nessas flows novamente. Tem como?
m
thought for 120.8 seconds
Sim, dá para fazer isso com a API do Prefect 2.x usando o cliente async. A ideia é: - Consultar os flow runs dos últimos 5 minutos - Filtrar pelos que estão com state type FAILED ou CRASHED - Criar um novo run a partir do mesmo deployment (copiando parâmetros/tags se quiser) Observação importante no Prefect 2.x: “dar retry” em um flow run já finalizado (FAILED/CRASHED) alterando o estado diretamente não é o caminho indicado. O padrão é criar um novo flow run a partir do deployment original. Isso garante que toda a infraestrutura/worker seja acionada corretamente. Exemplo completo (script async) — filtra e reexecuta apenas runs que vieram de um deployment:
Copy code
import asyncio
from datetime import datetime, timedelta, timezone

from prefect.client.orion import get_client
from prefect.orion.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStartTime,
    StateFilter,
    StateTypeFilter,
)
from prefect.orion.schemas.sorting import FlowRunSort

async def retry_failed_flow_runs(last_minutes: int = 5):
    async with get_client() as client:
        now = datetime.now(timezone.utc)
        window_start = now - timedelta(minutes=last_minutes)

        # Busca flow runs com state type FAILED ou CRASHED nos últimos X minutos
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                start_time=FlowRunFilterStartTime(after_=window_start),
                state=StateFilter(
                    type=StateTypeFilter(any_=["FAILED", "CRASHED"])
                ),
            ),
            sort=FlowRunSort.START_TIME_DESC,
        )

        print(f"Encontrados {len(flow_runs)} flow runs com falha no período.")

        retried = 0
        for fr in flow_runs:
            # Evita subflows/flows ad-hoc sem deployment
            if not fr.deployment_id:
                # Sem deployment, não há como recriar um run automaticamente
                continue

            # Recria o run a partir do deployment, copiando parâmetros e tags
            new_run = await client.create_flow_run_from_deployment(
                deployment_id=fr.deployment_id,
                parameters=getattr(fr, "parameters", None),
                tags=list(getattr(fr, "tags", [])) or None,
            )
            print(f"Reagendado run {fr.id} -> novo run {new_run.id}")
            retried += 1

        print(f"Total reexecutado: {retried}")

if __name__ == "__main__":
    asyncio.run(retry_failed_flow_runs(last_minutes=5))
Notas e boas práticas: - Isso só funciona diretamente para runs que vieram de um deployment (
deployment_id
presente). Para runs ad-hoc ou subflows, não há deployment para recriar; na prática, você precisaria iniciar manualmente uma nova execução do flow principal. - Se quiser evitar reexecutar subflows, o filtro acima já ajuda porque geralmente subflows não têm
deployment_id
. Ainda assim, se necessário, você pode adicionar lógica extra para checar e pular subflows. - Para rodar periodicamente, você pode: - Agendar esse script com um cron externo, ou - Transformá-lo em um flow do Prefect e criar um deployment com um schedule a cada 5 minutos. Referências (Prefect 2.x): - Usando o Orion Client (consultas e criação de runs) - API de filtros (FlowRunFilter, StateFilter, StateTypeFilter) - create_flow_run_from_deployment Se quiser, posso adaptar o exemplo para: - copiar também o
name
ou outras labels, - aplicar filtros adicionais (tags específicas, apenas certos deployments), - ou transformar tudo em um flow do Prefect com deployment e schedule.