Hi guys! I'd like to start a flow execution from a...
# prefect-community
m
Hi guys! I'd like to start a flow execution from another flow. Is it possible? I have flow A that is scheduled and runs every 5 minutes. When it finds something pending in our DB it should start flow B with some parameters. Can I do that?
k
Hey @Matias Godoy, This isn’t handled in a first-class way currently, however you can use the API to kick off the the next flow run using the flow run ID. This can be accomplished by adding a downstream task to a flow that schedules the next flow through the GraphQL API, which can be implemented by using
tasks.cloud.flowruntask
which is documented here.
j
@Matias Godoy - if you are restricted to using core (as I am) then I can share a solution using threading with you. Be warned though, it is really hacky - and I've only tested using local dask.
Otherwise, it might be worth looking at an ifelse control flow to do it all in the same flow (see: https://docs.prefect.io/api/latest/tasks/control_flow.html#functions)
j
Alternatively, if you use the new Prefect Server, @Kyle Moon-Wright’s solution should still work (the task may need a minor modification to point at your local API endpoint instead of api.prefect.io, but the flow run APIs are identical)
m
Okay! thanks for your answers! I don't know why, but in my mind this was totally doable in an easy way. I'm using the Prefect Server, so I will have to make this modification. Is it documented?
a
@Jacques Can you share you hackish method? Briefly described in a few sentences :)
j
Yeah sure @alvin goh - but let me preface this with saying that this was driven by a need for performance, and I'll be dropping it as soon as the depth first execution is released. My use-case is ingesting from APIs that page results by returning a random token for each next page, then transforming and loading results. Essentially what I do is run() the flow with initial conditions in a concurrent.futures ThreadPoolExecutor as suggested by someone else in this channel. Then I do a while loop (while any thread futures are still running), and inside that I loop through each task in the flow.tasks list. For each task I look for new attribute in the Task object. I use this attribute to pass back the page token value. So in the Task I do a
self.page_token = XXX
, and in the monitoring loop I do something like
try: task.page_token
and if it doesn't except I pass that value into an updated parameters and submit a new flow.run() thread future.
a
wow! that's intense! but thanks for sharing 🙂 each flow stays on one thread now?
j
That's right yup. Yeah really looking forward the the depth first stuff so I can get rid of this hack job.
m
Okay, for anyone interested I managed to make it work in two ways. The first one is the one mentioned by @Kyle Moon-Wright :
Copy code
from prefect.tasks.cloud.flow_run import FlowRunTask

kickoff_task = FlowRunTask(project_name='my_project', flow_name='my_flow', parameters=params)
kickoff_task.run()
This works, but I also wanted to specify a custom name to my run, and I didn't find the way to do it with
FlowRunTask
So I tried using a
Client
:
Copy code
from prefect import Client

client = Client()
client.create_flow_run(flow_id='my-uuid', parameters=params, run_name='A nice name')
and this also works with the advantage of letting me set a name
Also this one does not rely on Cloud to work. I can use it with Prefect Server :)