Surat Mukker
09/15/2022, 3:54 AMPaul-Etienne Coisne
09/15/2022, 12:08 PMBritt Evans
09/15/2022, 4:01 PMasync def is_already_running(flow_ctx: FlowRunContext) -> bool:
"""Return true if the given flow run context has a deployment already running in RUNNING state"""
if not flow_ctx.flow_run.deployment_id:
return False
runs = await flow_ctx.client.read_flow_runs(
flow_run_filter=FlowRunFilter(
id=FlowRunFilterId(not_any_=[flow_ctx.flow_run.id]),
deployment_id=FlowRunFilterDeploymentId(
any_=[flow_ctx.flow_run.deployment_id]
),
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=[StateType.RUNNING])
),
)
)
n_flows = len(runs)
return n_flows > 0
Usage:
from prefect import flow, get_run_logger, context
@flow(name="My Flow")
async def my_flow():
logger = get_run_logger()
if await is_already_running(context.get_run_context()):
<http://logger.info|logger.info>("Skipping, already running")
return
Surat Mukker
09/16/2022, 5:43 AMChristopher Boyd
09/19/2022, 2:40 PMSurat Mukker
09/19/2022, 7:28 PM