Britt Evans
07/07/2022, 4:35 PMKevin Kho
Britt Evans
07/07/2022, 4:48 PMKevin Kho
context
with this which should contain some scheduled start time?from prefect import flow, task
from prefect.context import get_run_context
@flow
def test():
ctx = get_run_context()
print(ctx.flow_run.expected_start_time)
redsquare
07/07/2022, 5:05 PMBritt Evans
07/07/2022, 5:51 PMimport time
from prefect import flow, get_run_logger, context, task
from prefect.client import OrionClient
from prefect.orion.schemas.filters import (
FlowRunFilter,
FlowRunFilterDeploymentId,
FlowRunFilterState,
FlowRunFilterStateType,
FlowRunFilterId,
)
from prefect.orion.schemas.states import StateType
from uuid import UUID
@task
async def is_already_running(
client: OrionClient, my_deployment_id: UUID, my_flow_run_id: UUID
) -> bool:
"""Return true if the given deployment has other existing runs in RUNNING or PENDING states"""
logger = get_run_logger()
runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
id=FlowRunFilterId(not_any_=[my_flow_run_id]),
deployment_id=FlowRunFilterDeploymentId(any_=[my_deployment_id]),
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=[StateType.RUNNING, StateType.PENDING])
),
)
)
n_flows = len(runs)
<http://logger.info|logger.info>(f"{n_flows} already running")
return n_flows > 0
@flow
def slow_flow():
logger = get_run_logger()
ctx = context.get_run_context()
if is_already_running(
client=ctx.client,
my_deployment_id=ctx.flow_run.deployment_id,
my_flow_run_id=ctx.flow_run.id,
).result():
<http://logger.info|logger.info>("Skipping, already running")
return
<http://logger.info|logger.info>("Starting now, about to sleep for 10 minutes")
time.sleep(600)
<http://logger.info|logger.info>("Waking up, all done")