<@ULVA73B9P> Is it possible to restart my deployme...
# ask-marvin
v
@Marvin Is it possible to restart my deployment? For example, if the flow rum crashes, it restarts the flow. I’m running Prefect 2.19
m
thought for 249.0 seconds
Short answer: yes—on Prefect 2.19 you can auto “restart” a deployment when a flow run crashes by using an Automation that triggers on CRASHED (or FAILED) runs and runs the same deployment again. Flow-level retries won’t fire on CRASHED runs; those retries are for exceptions handled inside the flow engine. Two common approaches 1) Automations (recommended) - What it does: watches for a flow run to enter CRASHED or FAILED, then triggers the same deployment (or a different one) to run again. - How to set it up (UI): 1. In the Prefect UI, go to Orchestration → Automations → + Create automation 2. Trigger: Flow run state entered - State: CRASHED (add FAILED if you want both) - Filter: Deployment name = your deployment 3. Action: Run deployment - Select the same deployment - (Optional) Pass any parameters you need 4. Safety: - Add a cooldown (e.g., 10 minutes) to avoid rapid re-triggers - Consider scoping to only a specific work queue or tag if needed - Docs: - Automations (concepts) - Automations (how-to) 2) Infrastructure-level restarts (if using Kubernetes/Docker) - If you’re running on Kubernetes, you can use a Job with a non-zero backoffLimit so the cluster retries the pod when it exits unexpectedly. This restarts the same flow run attempt at the infrastructure level. - For Docker or other runtimes, you can use platform-native restart policies to restart the container if it dies. Note: this re-runs the same attempt; it’s not a new flow run. - This complements an Automation; you can do both. Notes on flow retries -
@flow(retries=..., retry_delay_seconds=...)
will not automatically retry CRASHED runs because the flow engine never got a chance to handle the exception. Use Automations for crash handling; use flow/task retries for application errors. - Example for failures (not crashes):
Copy code
from prefect import flow

  @flow(retries=3, retry_delay_seconds=60)
  def my_flow():
      ...
Manual restart from the CLI - You can always kick off a new run of the same deployment:
Copy code
prefect deployment run "MyFlow/MyDeployment" --watch
This is what the Automation action does automatically. Questions to tailor this for you - Are you on Prefect Cloud or open-source Server? - Which worker/infrastructure are you using (process, Docker, Kubernetes, ECS, etc.)? - Do you want a limited number of automatic restarts (e.g., only retry twice), or keep trying with a cooldown? With that info, I can suggest exact automation filters and any infra settings to make it resilient without creating infinite loops.
v
@Marvin não encontrei essa aba Orchestration > Automation
m
thought for 71.3 seconds
Entendi! No Prefect 3.x, a navegação mudou um pouco — a seção aparece como “Automations” diretamente no menu lateral, não necessariamente como “Orchestration > Automation”. Onde encontrar: - Prefect Cloud: no menu lateral, clique em “Automations”. Link direto (substitua pelos seus slugs): https://app.prefect.cloud/sua-conta/seu-workspace/automations - UI local (OSS): acesse http://127.0.0.1:4200/automations Docs úteis: - API de Automations (Prefect 3) Pode me dizer: - Você está usando o Prefect Cloud ou a UI local? - Qual versão do Prefect (2.x ou 3.x)? Se for 2.x, recomendo atualizar para 3.x; caso fique em 2.x, a seção ainda se chama “Automations”, mas a navegação pode diferir um pouco.
v
Mas o meu é o 2.19
@Marvin o meu é o 2.19
m
thought for 50.1 seconds
Perfeito — no Prefect 2.19: - Automations existem apenas no Prefect Cloud (não aparecem na UI local/Orion). - No Cloud, elas ficam no menu lateral como “Automations” (item próprio, não “Orchestration > ...”). Ao entrar no seu workspace, clique em “Automations”. A URL do workspace termina com
/automations
(ex.: https://app.prefect.cloud/seu-account/seu-workspace/automations). Docs (2.x): - Automations no Prefect 2.x Se você estiver usando a UI local (OSS), é por isso que não encontrou a aba — ela não existe no 2.19. Se estiver no Cloud e ainda não aparecer, me diga seu tipo de acesso (permissões) ou mande um print da barra lateral que eu te ajudo a localizar. Observação: se puder, considere migrar para o Prefect 3.x — há melhorias significativas e a navegação está mais consistente.
v
@Marvin Tem como criar via CLI? eu vi que tem um comando chamado prefect automation
m
thought for 226.3 seconds
Ótima pergunta. No Prefect 2.19 não dá para CRIAR automations via CLI. O comando
prefect automation
serve apenas para listar, inspecionar e habilitar/pausar/excluir automations que já existem no Prefect Cloud. Alguns comandos úteis:
Copy code
# listar
prefect automation ls

# inspecionar (útil para ver a configuração em JSON/YAML)
prefect automation inspect "nome-da-automation" --json
prefect automation inspect "nome-da-automation" --yaml

# pausar / retomar
prefect automation pause "nome-da-automation"
prefect automation resume "nome-da-automation"

# excluir
prefect automation delete "nome-da-automation"
Como criar então? - Prefect 2.19: criação é feita pelo Prefect Cloud (UI). Depois você pode usar
inspect
para exportar a configuração e versionar. - Programaticamente: é possível via API do Prefect Cloud, mas não há subcomando CLI para “create”. Se você precisar, eu te ajudo a montar a chamada HTTP correta para o seu workspace. Docs (2.x): - Automations no Prefect 2.x Quer que eu te ajude a criar via API? Se sim, me diga: - Se você já tem uma automation de exemplo (posso usar o
inspect --json
como base) - Seu workspace (account/workspace) no Cloud para eu montar o endpoint certinho.
v
Isso aqui funcionaria? from prefect.client.orchestration import get_client import asyncio import subprocess DEPLOYMENT_NAME = "my_flow/my_deployment" async def restart_failed_flows(): async with get_client() as client: # pega últimas 10 runs flow_runs = await client.read_flow_runs(limit=10) for run in flow_runs: if run.state.name in {"Failed", "Crashed"}: print(f"Reiniciando {run.name} ({run.state.name})") subprocess.run( ["prefect", "deployment", "run", DEPLOYMENT_NAME] ) if name == "__main__": asyncio.run(restart_failed_flows())
@Marvin Isso aqui funcionaria? from prefect.client.orchestration import get_client import asyncio import subprocess DEPLOYMENT_NAME = "my_flow/my_deployment" async def restart_failed_flows(): async with get_client() as client: # pega últimas 10 runs flow_runs = await client.read_flow_runs(limit=10) for run in flow_runs: if run.state.name in {"Failed", "Crashed"}: print(f"Reiniciando {run.name} ({run.state.name})") subprocess.run( ["prefect", "deployment", "run", DEPLOYMENT_NAME] ) if name == "__main__": asyncio.run(restart_failed_flows())
m
thought for 235.9 seconds
Quase — mas há alguns pontos importantes. O que seu script faz hoje: - Ele pega as últimas 10
flow runs
de todo o workspace (sem filtro) e, se o estado for `Failed`/`Crashed`, dispara SEMPRE o mesmo deployment fixo (
my_flow/my_deployment
) via CLI. - Isso pode recriar runs no deployment errado (se as falhas vierem de outros deployments). - Além disso,
subprocess.run
bloqueia o loop assíncrono. Recomendações (assumindo Prefect 3.x): - Em 3.x não há “retry” direto de um run já falhado; a abordagem recomendada é criar um NOVO flow run a partir do deployment original. A CLI
prefect deployment run
é para isso e continua válida. - Use o
deployment_id
do próprio flow run falhado para disparar o novo run — a CLI aceita
--id
, evitando precisar montar o nome
flow/deployment
. - Filtre os
flow runs
já na consulta (por estado e, se quiser, por flow/deployment), em vez de checar o estado no loop. Versão mínima corrigida (mantendo a CLI e usando o deployment certo por run):
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterStateType
from prefect.client.schemas.objects import StateType
import asyncio
import subprocess

async def restart_failed_flows():
    async with get_client() as client:
        failed = FlowRunFilterStateType(any_=[StateType.FAILED, StateType.CRASHED])

        flow_runs = await client.read_flow_runs(
            limit=10,
            flow_runs=FlowRunFilter(state_type=failed),
        )

        for run in flow_runs:
            if not run.deployment_id:
                print(f"Pulado {run.name}: sem deployment associado")
                continue

            print(f"Recriando {run.name} ({run.state.name}) do deployment {run.deployment_id}")
            subprocess.run(
                ["prefect", "deployment", "run", "--id", str(run.deployment_id)]
            )

if __name__ == "__main__":
    asyncio.run(restart_failed_flows())
Versão 100% via API (sem subprocess) e preservando parâmetros do run original:
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter, FlowRunFilterStateType,
)
from prefect.client.schemas.objects import StateType
import asyncio

async def restart_failed_flows():
    async with get_client() as client:
        failed = FlowRunFilterStateType(any_=[StateType.FAILED, StateType.CRASHED])

        flow_runs = await client.read_flow_runs(
            limit=10,
            flow_runs=FlowRunFilter(state_type=failed),
        )

        for run in flow_runs:
            if not run.deployment_id:
                print(f"Pulado {run.name}: sem deployment associado")
                continue

            print(f"Recriando {run.name} ({run.state.name}) do deployment {run.deployment_id}")
            # Cria um novo flow run a partir do deployment original
            await client.create_flow_run_from_deployment(
                deployment_id=run.deployment_id,
                parameters=getattr(run, "parameters", None),
            )

if __name__ == "__main__":
    asyncio.run(restart_failed_flows())
Dicas adicionais: - Se você quer somente falhas de um deployment/flow específico, adicione filtros por
FlowFilterName
e
DeploymentFilterName
à chamada de
read_flow_runs
. - Tenha em mente que isso criará novos runs para cada falha encontrada; se você precisa evitar duplicidade (ex.: já re-disparou antes), adicione alguma lógica de deduplicação. Referências: - “Criar deployments” e execução de runs a partir de deployments: Docs: criar deployments - Não há “retry” direto de runs falhados; “resume” é apenas para runs pausados: API: resume de flow run (pausado)
Pergunta: você quer reexecutar sempre o mesmo deployment fixo, ou reexecutar o deployment específico que gerou cada run falhada (preservando parâmetros)? Posso ajustar o exemplo conforme sua necessidade.
v
Eu uso o prefect 2.x. Eu quero passar o deployment, ele pega as ultimas 10 flows desse deployment, pegue as que crasharam e rode elas novamente com os mesmos parametros da que crashou.
@Marvin Eu uso o prefect 2.x. Eu quero passar o deployment, ele pega as ultimas 10 flows desse deployment, pegue as que crasharam e rode elas novamente com os mesmos parametros da que crashou.
m
thought for 60.1 seconds
Perfeito — dá para fazer isso em 2.x de forma direta via API, mantendo os mesmos parâmetros. Abaixo um exemplo completo que: - recebe
FLOW_NAME/DEPLOYMENT_NAME
- busca o
deployment_id
- lê as últimas 10 runs daquele deployment filtrando só as que crasharam - cria novos flow runs no mesmo deployment reutilizando os parâmetros do run original
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStateType,
    DeploymentFilter,
    DeploymentFilterId,
)
from prefect.client.schemas.objects import StateType

DEPLOYMENT = "my_flow/my_deployment"  # formato: FLOW_NAME/DEPLOYMENT_NAME

async def restart_crashed_runs_for_deployment(deployment_qual_name: str):
    flow_name, deployment_name = deployment_qual_name.split("/", 1)

    async with get_client() as client:
        # 1) Resolve o deployment
        deployment = await client.read_deployment_by_name(flow_name, deployment_name)

        # 2) Busca as últimas 10 runs desse deployment que CRASHARAM
        crashed = FlowRunFilterStateType(any_=[StateType.CRASHED])

        flow_runs = await client.read_flow_runs(
            limit=10,
            flow_runs=FlowRunFilter(state_type=crashed),
            deployments=DeploymentFilter(id=DeploymentFilterId(any_=[deployment.id])),
        )

        if not flow_runs:
            print("Nenhuma run 'Crashed' encontrada para este deployment nas últimas 10.")
            return

        # 3) Recria as runs com os MESMOS parâmetros
        for run in flow_runs:
            params = getattr(run, "parameters", None) or {}
            new_run = await client.create_flow_run_from_deployment(
                deployment_id=deployment.id,
                parameters=params,
            )
            print(f"Recriada: original={run.id} -> novo run={new_run.id}")

if __name__ == "__main__":
    asyncio.run(restart_crashed_runs_for_deployment(DEPLOYMENT))
Observações: - Mantive apenas
Crashed
. Se quiser incluir
Failed
, troque o filtro para
any_=[StateType.CRASHED, StateType.FAILED]
. - Esse script usa só a API (sem
subprocess
), o que evita bloquear o event loop e garante que os parâmetros sejam passados certinho. - Se você quiser também replicar tags ou outro metadata, dá para passar
tags=run.tags
em
create_flow_run_from_deployment
. Quer que eu ajuste para incluir
Failed
também ou limitar por janela de tempo específica em vez de “últimas 10”?