Zhang David
06/24/2023, 8:02 PM@flow
def run_A():
pass
@flow
def run_B():
pass
@flow
def run_C():
a = run_A()
b = run_B(wait_for=[a])
my question is - how can I do it in Prefect 2, so that when flow run_C
executes, it first checks if run_A
indeed finishes? if not, it would wait until it finishes, even though it might be scheduled to run now ?Zhang David
06/25/2023, 12:26 PMDominic Tarro
06/26/2023, 1:04 PMrun_A()
inside of run_c
makes it a subflow). Instead, you want to start C and check that an independently scheduled run_A
flow run is completed?Zhang David
06/26/2023, 1:06 PMZhang David
06/26/2023, 1:07 PMZhang David
06/26/2023, 1:09 PMDominic Tarro
06/26/2023, 1:10 PMrun_B
and run_A
. Using the on_completion
flow argument, create a function that sets Prefect variable(s) to True. Treat these as a boolean flags.
On a shorter interval, have run_C
check that the run_A
and run_B
flags are True. If both are True, continue with what you're doing and, on successful completion, set the variables to False.
**Only useful if you have multiple inter-related deployments. 1 deployment is simplerDominic Tarro
06/26/2023, 1:10 PMDominic Tarro
06/26/2023, 1:10 PMDominic Tarro
06/26/2023, 1:10 PMZhang David
06/26/2023, 1:11 PMWhen Flow A completes successfully, trigger Flow B's deploymentoh nice! exactly what i want
Prefect Automationsis there a way to do it programatically in python ?
Dominic Tarro
06/26/2023, 1:12 PMDominic Tarro
06/26/2023, 1:15 PMfrom prefect.engine import FlowRun
from prefect.server.schemas.responses import DeploymentResponse
from prefect.states import Scheduled, State
async def trigger_flow_deployment(flow: Flow, flow_run: FlowRun, state: State):
"""Schedules a flow to run immediately. To be used with a Flow's `on_completion`."""
result = await state.result(fetch=True)
async with get_client() as client:
deployment: DeploymentResponse = await get_deployment(
client, name="My Flow/Deployment Name", deployment_id=None
)
await client.create_flow_run_from_deployment(
deployment.id,
parameters=...,
state=Scheduled(),
)
Dominic Tarro
06/26/2023, 1:15 PMrun_A
flow's on_completion
Zhang David
06/26/2023, 1:20 PMtrigger_flow_deployment
this is a on_completion
callback impl. for the upstream/parent (i.e. for A in this case)
⢠name="My Flow/Deployment Name"
<- this should be B in my example above, or whatever child/downstream flow is
⢠the result is the B gets triggered upon completion of A ?
btw this is Prefect 2 syntax right ?Dominic Tarro
06/26/2023, 1:20 PMZhang David
06/26/2023, 1:26 PMDominic Tarro
06/26/2023, 1:27 PMScheduled
class. I haven't done that, but I'm sure you can set the exact time to run.Zhang David
06/26/2023, 1:29 PMNate
06/26/2023, 10:37 PMrun_deployment
utility you can import with
from prefect.deployments import run_deployment
which is capable of scheduling (now or later) a flow run from a deployment, instead of using the client directly and calling create_flow_run_from_deployment
Zhang David
06/26/2023, 10:59 PMNate
06/26/2023, 11:04 PMfrom prefect.deployments import run_deployment
flow_run_object = run_deployment(name="healthcheck/healthcheck-demo") #flow-name/deployemnt-name
which will block until the flow run completes, or you can set timeout=0
to fire and forgetZhang David
06/26/2023, 11:07 PMZhang David
06/26/2023, 11:07 PMNate
06/26/2023, 11:07 PMZhang David
06/26/2023, 11:13 PMprocess_pokemon_batch
and since when you deploy, you didn't specify -n
so that the name is auto registered as process-pokemon-batch
(i.e. from the function name, similar as cli name parsing) ?Nate
06/26/2023, 11:16 PMprocess_pokemon_batch
flow, the flow's name got assigned as process-pokemon-batch
and then worker
is the name of the deployment i gave itNate
06/26/2023, 11:17 PMZhang David
06/26/2023, 11:25 PM# definitions
@flow()
async def run_parent():
print('running parent')
sleep(60)
@flow()
async def run_child():
print('running child')
await run_deployment(name='run-parent',)
Zhang David
06/26/2023, 11:29 PMNate
06/26/2023, 11:30 PMrun_deployment
is fundamentally async but will act like a sync function if called from a sync contextZhang David
06/26/2023, 11:35 PMNate
06/26/2023, 11:38 PMNate
06/26/2023, 11:39 PMMarvin
06/26/2023, 11:40 PMZhang David
06/26/2023, 11:45 PMZhang David
06/27/2023, 12:43 AMimport pendulum
from prefect import flow
from prefect.deployments import Deployment, run_deployment
from prefect.server.schemas.schedules import CronSchedule
# refs:
# <https://github.com/PrefectHQ/prefect-recipes/blob/main/flows-advanced/parent-orchestrator/pokemon_weight.py#L57-L65>
# <https://github.com/PrefectHQ/prefect/blob/6065be2434871ca1164d85ca00d9cb5bce509445/src/prefect/deployments/deployments.py#L48>
# deploy this flow with:
# prefect deployment build test_deploy_dep_flows.py:run_upstream -n upstream_deployment -a
@flow(log_prints=True,)
async def run_upstream():
from time import sleep
print(f"running upstream @ {pendulum.now(tz='America/New_York')}")
sleep(2*60)
print(f"...upstream flow completed @ {pendulum.now(tz='America/New_York')}")
# deploy this flow with:
# prefect deployment build test_deploy_dep_flows.py:run_downstream -n downstream_deployment -a
@flow(log_prints=True)
async def run_downstream():
await run_deployment(name='run-upstream/upstream_deployment')
print(f"running downstream @ {pendulum.now(tz='America/New_York')}...")
if __name__ == '__main__':
deployment = Deployment.build_from_flow(
flow=run_upstream,
name="upstream_deployment",
version=1,
work_queue_name="default",
work_pool_name="default-agent-pool",
schedule=CronSchedule(cron="37 20 * * 1", timezone="America/New_York"),
)
deployment.apply()
deployment = Deployment.build_from_flow(
flow=run_downstream,
name="downstream_deployment",
version=1,
work_queue_name="default",
work_pool_name="default-agent-pool",
schedule=CronSchedule(cron="38 20 * * 1", timezone="America/New_York"),
)
deployment.apply()
...and was able to see that flow run_downstream
respects the completion of run_upstream
even though I scheduled it to run 1 minute after upstream, and the upstream takes 2 minutes to finish.
upstream log/downstream logZhang David
06/27/2023, 12:44 AMZhang David
06/27/2023, 12:45 AMNate
06/27/2023, 12:49 AMprefect.yaml
and prefect deploy
now to define / deploy these flows, since this method of deployment management is what we're moving toward in the future
docs
example repoZhang David
06/27/2023, 12:52 AMZhang David
06/27/2023, 12:54 AMNate
06/27/2023, 3:49 AMZhang David
06/27/2023, 12:03 PMZhang David
08/10/2023, 4:13 AM