Victor Longui
08/21/2025, 5:46 PMMarvin
08/21/2025, 5:46 PMMarvin
08/21/2025, 5:50 PM@flow(retries=..., retry_delay_seconds=...)
will not automatically retry CRASHED runs because the flow engine never got a chance to handle the exception. Use Automations for crash handling; use flow/task retries for application errors.
- Example for failures (not crashes):
from prefect import flow
@flow(retries=3, retry_delay_seconds=60)
def my_flow():
...
Manual restart from the CLI
- You can always kick off a new run of the same deployment:
prefect deployment run "MyFlow/MyDeployment" --watch
This is what the Automation action does automatically.
Questions to tailor this for you
- Are you on Prefect Cloud or open-source Server?
- Which worker/infrastructure are you using (process, Docker, Kubernetes, ECS, etc.)?
- Do you want a limited number of automatic restarts (e.g., only retry twice), or keep trying with a cooldown?
With that info, I can suggest exact automation filters and any infra settings to make it resilient without creating infinite loops.Victor Longui
08/21/2025, 8:20 PMMarvin
08/21/2025, 8:20 PMMarvin
08/21/2025, 8:21 PMVictor Longui
08/21/2025, 8:42 PMVictor Longui
08/21/2025, 8:42 PMMarvin
08/21/2025, 8:42 PMMarvin
08/21/2025, 8:43 PM/automations
(ex.: https://app.prefect.cloud/seu-account/seu-workspace/automations).
Docs (2.x):
- Automations no Prefect 2.x
Se você estiver usando a UI local (OSS), é por isso que não encontrou a aba — ela não existe no 2.19. Se estiver no Cloud e ainda não aparecer, me diga seu tipo de acesso (permissões) ou mande um print da barra lateral que eu te ajudo a localizar.
Observação: se puder, considere migrar para o Prefect 3.x — há melhorias significativas e a navegação está mais consistente.Victor Longui
08/21/2025, 9:01 PMMarvin
08/21/2025, 9:01 PMMarvin
08/21/2025, 9:05 PMprefect automation
serve apenas para listar, inspecionar e habilitar/pausar/excluir automations que já existem no Prefect Cloud.
Alguns comandos úteis:
# listar
prefect automation ls
# inspecionar (útil para ver a configuração em JSON/YAML)
prefect automation inspect "nome-da-automation" --json
prefect automation inspect "nome-da-automation" --yaml
# pausar / retomar
prefect automation pause "nome-da-automation"
prefect automation resume "nome-da-automation"
# excluir
prefect automation delete "nome-da-automation"
Como criar então?
- Prefect 2.19: criação é feita pelo Prefect Cloud (UI). Depois você pode usar inspect
para exportar a configuração e versionar.
- Programaticamente: é possível via API do Prefect Cloud, mas não há subcomando CLI para “create”. Se você precisar, eu te ajudo a montar a chamada HTTP correta para o seu workspace.
Docs (2.x):
- Automations no Prefect 2.x
Quer que eu te ajude a criar via API? Se sim, me diga:
- Se você já tem uma automation de exemplo (posso usar o inspect --json
como base)
- Seu workspace (account/workspace) no Cloud para eu montar o endpoint certinho.Victor Longui
08/23/2025, 9:48 PMVictor Longui
08/23/2025, 9:48 PMMarvin
08/23/2025, 9:48 PMMarvin
08/23/2025, 9:52 PMflow runs
de todo o workspace (sem filtro) e, se o estado for `Failed`/`Crashed`, dispara SEMPRE o mesmo deployment fixo (my_flow/my_deployment
) via CLI.
- Isso pode recriar runs no deployment errado (se as falhas vierem de outros deployments).
- Além disso, subprocess.run
bloqueia o loop assíncrono.
Recomendações (assumindo Prefect 3.x):
- Em 3.x não há “retry” direto de um run já falhado; a abordagem recomendada é criar um NOVO flow run a partir do deployment original. A CLI prefect deployment run
é para isso e continua válida.
- Use o deployment_id
do próprio flow run falhado para disparar o novo run — a CLI aceita --id
, evitando precisar montar o nome flow/deployment
.
- Filtre os flow runs
já na consulta (por estado e, se quiser, por flow/deployment), em vez de checar o estado no loop.
Versão mínima corrigida (mantendo a CLI e usando o deployment certo por run):
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterStateType
from prefect.client.schemas.objects import StateType
import asyncio
import subprocess
async def restart_failed_flows():
async with get_client() as client:
failed = FlowRunFilterStateType(any_=[StateType.FAILED, StateType.CRASHED])
flow_runs = await client.read_flow_runs(
limit=10,
flow_runs=FlowRunFilter(state_type=failed),
)
for run in flow_runs:
if not run.deployment_id:
print(f"Pulado {run.name}: sem deployment associado")
continue
print(f"Recriando {run.name} ({run.state.name}) do deployment {run.deployment_id}")
subprocess.run(
["prefect", "deployment", "run", "--id", str(run.deployment_id)]
)
if __name__ == "__main__":
asyncio.run(restart_failed_flows())
Versão 100% via API (sem subprocess) e preservando parâmetros do run original:
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter, FlowRunFilterStateType,
)
from prefect.client.schemas.objects import StateType
import asyncio
async def restart_failed_flows():
async with get_client() as client:
failed = FlowRunFilterStateType(any_=[StateType.FAILED, StateType.CRASHED])
flow_runs = await client.read_flow_runs(
limit=10,
flow_runs=FlowRunFilter(state_type=failed),
)
for run in flow_runs:
if not run.deployment_id:
print(f"Pulado {run.name}: sem deployment associado")
continue
print(f"Recriando {run.name} ({run.state.name}) do deployment {run.deployment_id}")
# Cria um novo flow run a partir do deployment original
await client.create_flow_run_from_deployment(
deployment_id=run.deployment_id,
parameters=getattr(run, "parameters", None),
)
if __name__ == "__main__":
asyncio.run(restart_failed_flows())
Dicas adicionais:
- Se você quer somente falhas de um deployment/flow específico, adicione filtros por FlowFilterName
e DeploymentFilterName
à chamada de read_flow_runs
.
- Tenha em mente que isso criará novos runs para cada falha encontrada; se você precisa evitar duplicidade (ex.: já re-disparou antes), adicione alguma lógica de deduplicação.
Referências:
- “Criar deployments” e execução de runs a partir de deployments: Docs: criar deployments
- Não há “retry” direto de runs falhados; “resume” é apenas para runs pausados: API: resume de flow run (pausado)Marvin
08/23/2025, 9:52 PMVictor Longui
08/23/2025, 10:23 PMVictor Longui
08/23/2025, 10:23 PMMarvin
08/23/2025, 10:23 PMMarvin
08/23/2025, 10:24 PMFLOW_NAME/DEPLOYMENT_NAME
- busca o deployment_id
- lê as últimas 10 runs daquele deployment filtrando só as que crasharam
- cria novos flow runs no mesmo deployment reutilizando os parâmetros do run original
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterStateType,
DeploymentFilter,
DeploymentFilterId,
)
from prefect.client.schemas.objects import StateType
DEPLOYMENT = "my_flow/my_deployment" # formato: FLOW_NAME/DEPLOYMENT_NAME
async def restart_crashed_runs_for_deployment(deployment_qual_name: str):
flow_name, deployment_name = deployment_qual_name.split("/", 1)
async with get_client() as client:
# 1) Resolve o deployment
deployment = await client.read_deployment_by_name(flow_name, deployment_name)
# 2) Busca as últimas 10 runs desse deployment que CRASHARAM
crashed = FlowRunFilterStateType(any_=[StateType.CRASHED])
flow_runs = await client.read_flow_runs(
limit=10,
flow_runs=FlowRunFilter(state_type=crashed),
deployments=DeploymentFilter(id=DeploymentFilterId(any_=[deployment.id])),
)
if not flow_runs:
print("Nenhuma run 'Crashed' encontrada para este deployment nas últimas 10.")
return
# 3) Recria as runs com os MESMOS parâmetros
for run in flow_runs:
params = getattr(run, "parameters", None) or {}
new_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
parameters=params,
)
print(f"Recriada: original={run.id} -> novo run={new_run.id}")
if __name__ == "__main__":
asyncio.run(restart_crashed_runs_for_deployment(DEPLOYMENT))
Observações:
- Mantive apenas Crashed
. Se quiser incluir Failed
, troque o filtro para any_=[StateType.CRASHED, StateType.FAILED]
.
- Esse script usa só a API (sem subprocess
), o que evita bloquear o event loop e garante que os parâmetros sejam passados certinho.
- Se você quiser também replicar tags ou outro metadata, dá para passar tags=run.tags
em create_flow_run_from_deployment
.
Quer que eu ajuste para incluir Failed
também ou limitar por janela de tempo específica em vez de “últimas 10”?