Matias Godoy

    Matias Godoy

    2 years ago
    Hi guys! I'd like to start a flow execution from another flow. Is it possible? I have flow A that is scheduled and runs every 5 minutes. When it finds something pending in our DB it should start flow B with some parameters. Can I do that?
    Kyle Moon-Wright

    Kyle Moon-Wright

    2 years ago
    Hey @Matias Godoy, This isnโ€™t handled in a first-class way currently, however you can use the API to kick off the the next flow run using the flow run ID. This can be accomplished by adding a downstream task to a flow that schedules the next flow through the GraphQL API, which can be implemented by using
    tasks.cloud.flowruntask
    which is documented here.
    j

    Jacques

    2 years ago
    @Matias Godoy - if you are restricted to using core (as I am) then I can share a solution using threading with you. Be warned though, it is really hacky - and I've only tested using local dask.
    Otherwise, it might be worth looking at an ifelse control flow to do it all in the same flow (see: https://docs.prefect.io/api/latest/tasks/control_flow.html#functions)
    Jeremiah

    Jeremiah

    2 years ago
    Alternatively, if you use the new Prefect Server, @Kyle Moon-Wrightโ€™s solution should still work (the task may need a minor modification to point at your local API endpoint instead of api.prefect.io, but the flow run APIs are identical)
    Matias Godoy

    Matias Godoy

    2 years ago
    Okay! thanks for your answers! I don't know why, but in my mind this was totally doable in an easy way. I'm using the Prefect Server, so I will have to make this modification. Is it documented?
    a

    alvin goh

    2 years ago
    @Jacques Can you share you hackish method? Briefly described in a few sentences ๐Ÿ˜ƒ
    j

    Jacques

    2 years ago
    Yeah sure @alvin goh - but let me preface this with saying that this was driven by a need for performance, and I'll be dropping it as soon as the depth first execution is released. My use-case is ingesting from APIs that page results by returning a random token for each next page, then transforming and loading results. Essentially what I do is run() the flow with initial conditions in a concurrent.futures ThreadPoolExecutor as suggested by someone else in this channel. Then I do a while loop (while any thread futures are still running), and inside that I loop through each task in the flow.tasks list. For each task I look for new attribute in the Task object. I use this attribute to pass back the page token value. So in the Task I do a
    self.page_token = XXX
    , and in the monitoring loop I do something like
    try: task.page_token
    and if it doesn't except I pass that value into an updated parameters and submit a new flow.run() thread future.
    a

    alvin goh

    2 years ago
    wow! that's intense! but thanks for sharing ๐Ÿ™‚ each flow stays on one thread now?
    j

    Jacques

    2 years ago
    That's right yup. Yeah really looking forward the the depth first stuff so I can get rid of this hack job.
    Matias Godoy

    Matias Godoy

    2 years ago
    Okay, for anyone interested I managed to make it work in two ways. The first one is the one mentioned by @Kyle Moon-Wright :
    from prefect.tasks.cloud.flow_run import FlowRunTask
    
    kickoff_task = FlowRunTask(project_name='my_project', flow_name='my_flow', parameters=params)
    kickoff_task.run()
    This works, but I also wanted to specify a custom name to my run, and I didn't find the way to do it with
    FlowRunTask
    So I tried using a
    Client
    :
    from prefect import Client
    
    client = Client()
    client.create_flow_run(flow_id='my-uuid', parameters=params, run_name='A nice name')
    and this also works with the advantage of letting me set a name
    Also this one does not rely on Cloud to work. I can use it with Prefect Server ๐Ÿ˜ƒ