Sang Young Noh
05/12/2022, 3:50 PMZanie
05/12/2022, 3:53 PMget_run_context()
Sang Young Noh
05/12/2022, 3:55 PMAnna Geller
05/12/2022, 7:26 PMSang Young Noh
05/13/2022, 6:45 AMAnna Geller
05/13/2022, 11:19 AMSang Young Noh
05/13/2022, 11:19 AMAnna Geller
05/13/2022, 11:20 AMSang Young Noh
05/13/2022, 3:27 PMZanie
05/13/2022, 3:42 PMfrom datetime import timedelta
import pendulum
from prefect import flow, task
from prefect.context import FlowRunContext, get_run_context
from prefect.states import Failed
MAX_SECONDS_LATE = 10
@task
def my_task():
pass
@flow
def my_flow():
ctx: FlowRunContext = get_run_context()
seconds_late = (
pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
* -1
)
if seconds_late > 10:
return Failed(message=f"Flow started {seconds_late} seconds late.")
my_task()
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
DeploymentSpec(
flow=my_flow,
flow_runner=SubprocessFlowRunner(),
schedule={"interval": timedelta(seconds=20)},
)
Sang Young Noh
05/13/2022, 3:45 PMAnna Geller
05/17/2022, 11:12 AMDaskTaskRunner
define how to run tasks. On deployment spec you specify flow runners. And the work queue is essentially a filter to determine which flow runs should be picked up e.g. pick up only deployments with tag "ubuntu" on ubuntu machine, or pick up only deployments with DockerFlowRunner on a VM that has Docker configuredSang Young Noh
05/17/2022, 11:14 AMAnna Geller
05/17/2022, 11:15 AMSang Young Noh
05/17/2022, 11:19 AMAnna Geller
05/17/2022, 11:19 AMSang Young Noh
05/17/2022, 11:20 AMUniversalFlowRunner, SubprocessFlowRunner, DockerFlowRunner, KubernetesFlowRunner
, in addition to any tags that deployment has, if my understand is correctAnna Geller
05/17/2022, 11:20 AMSang Young Noh
05/17/2022, 11:21 AMAnna Geller
05/17/2022, 11:23 AMSang Young Noh
05/17/2022, 11:41 AMAnna Geller
05/17/2022, 11:46 AMSang Young Noh
05/17/2022, 11:47 AM