Hi guys, I have a flow dependency question that I ...
# prefect-cloud
z
Hi guys, I have a flow dependency question that I really need help on Prefect 2: Say I want to deploy 3 flows like below but each having different schedules
Copy code
@flow
def run_A():
    pass

@flow  
def run_B():
    pass 

@flow 
def run_C():
   a = run_A()
   b = run_B(wait_for=[a])
my question is - how can I do it in Prefect 2, so that when flow
run_C
executes, it first checks if
run_A
indeed finishes? if not, it would wait until it finishes, even though it might be scheduled to run now ?
d
If I understand correctly, you don't want these to run as subflows (calling
run_A()
inside of
run_c
makes it a subflow). Instead, you want to start C and check that an independently scheduled
run_A
flow run is completed?
z
yes exactly
b/c you can schedule A to be say at 100000 and B at 110000. You expect A would take < 1hr, but in prod everything can happen, and in case A takes > 1hr, there's a problem
what I want to achieve is that: 1. two flows (or more) as separate deployments (w/ different schedules) 2. in case of a dependency (e.g. flow B depends on completion of flow A), B would wait for A to complete, even though B is scheduled to run now
d
If this is true, I would try the following approach. Schedule
run_B
and
run_A
. Using the
on_completion
flow argument, create a function that sets Prefect variable(s) to True. Treat these as a boolean flags. On a shorter interval, have
run_C
check that the
run_A
and
run_B
flags are True. If both are True, continue with what you're doing and, on successful completion, set the variables to False. **Only useful if you have multiple inter-related deployments. 1 deployment is simpler
Ah, I see
Use Prefect Automations
When Flow A completes successfully, trigger Flow B's deployment
z
When Flow A completes successfully, trigger Flow B's deployment
oh nice! exactly what i want
Prefect Automations
is there a way to do it programatically in python ?
d
Yeah, let me find my code
šŸ™ 1
Copy code
from prefect.engine import FlowRun
from prefect.server.schemas.responses import DeploymentResponse
from prefect.states import Scheduled, State

async def trigger_flow_deployment(flow: Flow, flow_run: FlowRun, state: State):
    """Schedules a flow to run immediately. To be used with a Flow's `on_completion`."""
    result = await state.result(fetch=True)
    async with get_client() as client:

        deployment: DeploymentResponse = await get_deployment(
            client, name="My Flow/Deployment Name", deployment_id=None
        )

        await client.create_flow_run_from_deployment(
            deployment.id,
            parameters=...,
            state=Scheduled(),
        )
Use with
run_A
flow's
on_completion
z
truly appreciate !! just making sure I understand correctly: •
trigger_flow_deployment
this is a
on_completion
callback impl. for the upstream/parent (i.e. for A in this case) •
name="My Flow/Deployment Name"
<- this should be B in my example above, or whatever child/downstream flow is • the result is the B gets triggered upon completion of A ? btw this is Prefect 2 syntax right ?
šŸ’Æ 1
d
Yes to all
šŸ‘ 1
z
thanks!! last question: same setup example as above • A runs at 100000 • B runs at 110000 ...now, say A took 30 minutes to finish and we're at 103000 now. If using the above callback, B starts right away at 103000, is there a way to respect B's original schedule to run at 110000 ? Or we can modify your script above to achieve that?
d
Yes, you will have to play around with the
Scheduled
class. I haven't done that, but I'm sure you can set the exact time to run.
šŸ‘ 1
z
got it - really appreciate the great help!! let me play around w/ your script above, and come back if I have more questions, or, if I figure out the exact scheduling.
n
hey just heads up about the
run_deployment
utility you can import with
Copy code
from prefect.deployments import run_deployment
which is capable of scheduling (now or later) a flow run from a deployment, instead of using the client directly and calling
create_flow_run_from_deployment
šŸ™Œ 1
z
@Nate oh nice! thank you! are there any examples using it ?
n
i use it here but its a sync compatible function so you dont have to await it if its in a sync context, so just like
Copy code
from prefect.deployments import run_deployment

flow_run_object = run_deployment(name="healthcheck/healthcheck-demo") #flow-name/deployemnt-name
which will block until the flow run completes, or you can set
timeout=0
to fire and forget
z
that's awesome!! let me read and learn more
thanks!
n
sure thing!
z
sorry if you don't mind me asking: is the reason that you could do this https://github.com/PrefectHQ/prefect-recipes/blob/00033945a305165dea3e88a94902aa7030260967/flows-advanced/parent-orchestrator/pokemon_weight.py#[…]4 b/c it is calling the function
process_pokemon_batch
and since when you deploy, you didn't specify
-n
so that the name is auto registered as
process-pokemon-batch
(i.e. from the function name, similar as cli name parsing) ?
n
yeah, so our api wants hyphen separated things, when I deployed the
process_pokemon_batch
flow, the flow's name got assigned as
process-pokemon-batch
and then
worker
is the name of the deployment i gave it
šŸ‘ 1
so yeah i think you were exactly right
z
thanks - this makes so much sense! based on your great examples, for my use case above, I guess I could just do something simple as :
Copy code
# definitions
@flow() 
async def run_parent():
    print('running parent')
    sleep(60)


@flow() 
async def run_child():
    print('running child') 
    await run_deployment(name='run-parent',)
so for flow-wise async run, I guess i have to define flow as async funcs right ?
n
hmm you should be able to use either, as
run_deployment
is fundamentally async but will act like a sync function if called from a sync context
z
thanks! this is perfect! and I realized what I asked before was invalid.
n
cool - sure thing! marvin
@Marvin write a celebratory ditty about triggering flow runs from a deployment. its for science, i promise
m
Oh, the joys of triggering flow runs, a tale as old as time. Here's a little ditty for your scientific endeavors: (Verse 1) In the land of Prefect, where flows do run, A user sought to automate, just for fun. With a deployment in hand, and a flow by their side, They set out to trigger, with ambition and pride. (Chorus) Oh, trigger the flow, let it run far and wide, From the depths of deployment, let no task hide. For science we do this, a promise we keep, As the flow runs are triggered, in data we'll leap. (Verse 2) The user, quite clever, knew just what to do, To call upon Prefect, and make their dreams come true. With a touch of some code, and a sprinkle of grace, They set up their triggers, and the flow runs did race. (Chorus) Oh, trigger the flow, let it run far and wide, From the depths of deployment, let no task hide. For science we do this, a promise we keep, As the flow runs are triggered, in data we'll leap. (Bridge) And as the flow runs did dance, through the night and the day, The user rejoiced, for their work was at play. With a smile on their face, and a heart full of glee, They knew that with Prefect, their work would be free. (Chorus) Oh, trigger the flow, let it run far and wide, From the depths of deployment, let no task hide. For science we do this, a promise we keep, As the flow runs are triggered, in data we'll leap. Now that we've had our fun, let's get back to the serious business of triggering flow runs from a deployment. If you need help with that, just let me know. Ah, humans and their science.
z
LOL - nice!
@Nate I tried this toy example:
Copy code
import pendulum
from prefect import flow
from prefect.deployments import Deployment, run_deployment
from prefect.server.schemas.schedules import CronSchedule


# refs:
#   <https://github.com/PrefectHQ/prefect-recipes/blob/main/flows-advanced/parent-orchestrator/pokemon_weight.py#L57-L65>
#   <https://github.com/PrefectHQ/prefect/blob/6065be2434871ca1164d85ca00d9cb5bce509445/src/prefect/deployments/deployments.py#L48>

# deploy this flow with:
# prefect deployment build test_deploy_dep_flows.py:run_upstream -n upstream_deployment -a
@flow(log_prints=True,)
async def run_upstream():
    from time import sleep
    print(f"running upstream @ {pendulum.now(tz='America/New_York')}")
    sleep(2*60)
    print(f"...upstream flow completed @ {pendulum.now(tz='America/New_York')}")


# deploy this flow with:
# prefect deployment build test_deploy_dep_flows.py:run_downstream -n downstream_deployment -a
@flow(log_prints=True)
async def run_downstream():
    await run_deployment(name='run-upstream/upstream_deployment')
    print(f"running downstream @ {pendulum.now(tz='America/New_York')}...")


if __name__ == '__main__':
    deployment = Deployment.build_from_flow(
        flow=run_upstream,
        name="upstream_deployment",
        version=1,
        work_queue_name="default",
        work_pool_name="default-agent-pool",
        schedule=CronSchedule(cron="37 20 * * 1", timezone="America/New_York"),
    )
    deployment.apply()

    deployment = Deployment.build_from_flow(
        flow=run_downstream,
        name="downstream_deployment",
        version=1,
        work_queue_name="default",
        work_pool_name="default-agent-pool",
        schedule=CronSchedule(cron="38 20 * * 1", timezone="America/New_York"),
    )
    deployment.apply()
...and was able to see that flow
run_downstream
respects the completion of
run_upstream
even though I scheduled it to run 1 minute after upstream, and the upstream takes 2 minutes to finish. upstream log/downstream log
downstream log:
this seems to work! thanks again!
n
great! ill make a ticket to update this in the pokemon example i linked, but just to note, I'd recommend (if its all the same to you) using a
prefect.yaml
and
prefect deploy
now to define / deploy these flows, since this method of deployment management is what we're moving toward in the future docs example repo
z
I see - so cli is preferred over python ?
this definitely looks neater. is there any py utils that help create this? https://github.com/zzstoatzz/prefect-monorepo/blob/main/prefect.yaml or it's manual set-up for now
n
we dont have a python interface (besides build_from_flow) for the new deployment management yet! but there will likely be one coming - out of curiosity, do you prefer defining your deployments in python? if so, why?
z
I'm more of a python person... well reason I chose prefect was b/c compared to Airflow it seems much more pythonic and also it was recommended by Ray team since all my jobs would be running on a Ray managed cluster
@Nate hi - sorry to bother you. I'm deploying prefect for my production workflow, and wonder if this monorepo prefect-monorepo/prefect.yaml at main Ā· zzstoatzz/prefect-monorepo is still the "best" example that I could reference to while building my own prod wf ? Thanks a lot!