https://prefect.io logo
k

Kelly Huang

02/17/2022, 1:37 AM
Hi, I'm trying to create a flow that is dependent on 2 other flows. Is it possible (using the imperative API) to add tasks to the final flow?
k

Kevin Kho

02/17/2022, 1:39 AM
Could you show me some fake code snippet so I can get an idea of what you mean?
k

Kelly Huang

02/17/2022, 1:43 AM
Copy code
flow 1 = StartFlowRun(
    flow_name= flow 1,
    project_name=project_name, wait=True)
flow 2 = StartFlowRun(
    flow_name=Flow 2,
    project_name=project_name, wait=True)
flow 3 = StartFlowRun(
    flow_name=Flow 3,
    project_name=project_name, wait=True)

with Flow('flow '4) as flow:
    PARAMS HERE

    first flow = flow 1(
        upstream_tasks=[recipient_email, funds_list,
                        company_sector, market_cap_lower_boundary,
                        market_cap_upper_boundary],
        parameters={PARAMS})
    second flow = flow 2(
        upstream_tasks=[first flow],
        parameters={PARAMS})
    third flow = flow 3(
        upstream_tasks=[second flow],
        parameters={PARAMS})
my flow looks something like this, but my concern is that flow 3 doesn't actually work on its own, so I'd like to avoid making flow 4
I want flow 3 to be the "flow of flows" but add more tasks to it
k

Kevin Kho

02/17/2022, 1:45 AM
Ah I see so you can’t add it in the runtime because that would deviate from the registered DAG and the runtime execution needs to match that
k

Kelly Huang

02/17/2022, 1:46 AM
ah so should I just have a registered flow that's unused?
k

Kevin Kho

02/17/2022, 1:47 AM
I would suggest you either make a parameterizable path in Flow 3 that can fork and execute those added tasks or you add the tasks in Flow 4 pretty much
A bit confused. Just because you use
StartFlowRun
, doesn’t mean you can’t add more tasks to the “main” flow right?
k

Kelly Huang

02/17/2022, 1:49 AM
no, I'm just confused where I do it
just in the with Flow block?
k

Kevin Kho

02/17/2022, 1:53 AM
yes exactly
Copy code
with Flow('flow '4) as flow:
    PARAMS HERE

    first flow = flow 1(
        upstream_tasks=[recipient_email, funds_list,
                        company_sector, market_cap_lower_boundary,
                        market_cap_upper_boundary],
        parameters={PARAMS})
    x = some_other_task_here()
k

Kelly Huang

02/17/2022, 4:02 AM
ah thank you!!
@Kevin Kho Am I able to reference upstream tasks from other flows? For example let's say in second flow, my upstream tasks includes a task that was run in the first flow. How would I reference that? I tried just putting the task into
upstream_tasks
, but the second flow complains that the task was called without its parameter, but I'm not trying to run the task again, I just want its result
k

Kevin Kho

02/22/2022, 8:14 PM
You can’t because that Python object doesn’t exist in the subflow definition but isn’t it the order implied though that the upstream will execute first before you create the Flow run? Anyway, what you want is the
get_task_run_result
task to pull the task result from another Flow. Docs
k

Kelly Huang

02/22/2022, 8:22 PM
@Kevin Kho thank you! Does
flow_run_id
refer to the
version group id
? and how do I find the
task_slug
?
Yes the order implied is correct, but I need to use the result of that upstream task as a param in the later flow. Does your advice using
get_task_run_result
still apply?
k

Kevin Kho

02/22/2022, 8:27 PM
Yes it does. So you have two options. First is pass it as a Parameter to the subflow during
create_flow_run
so it’s given by the main Flow. The second is retrieve it with
get_task_run_result
. The easiest why to get a task slug is by printing
flow.serialize()['tasks']
and that should contain the slugs of the task then choose the one you want. It’s honestly hard to predict the output of the
slugify
algorithm something so printing is more straightforward
Print that in the main flow after the Flow definition and you should see it, or you can query the GraphQL API for the slugs of the registered Flow.
Ohhh I understand what you are saying. It’s hard to get the Flow run id downstream…I guess you have two options.
You can have the logic in the main flow:
Copy code
with Flow() as flow:
    x = create_flow_run()
    y = get_task_run_result(x, task_slug)
Or you can just do it from the main flow:
Copy code
with Flow() as flow:
    some_output = some_task()
    x = create_flow_run(..., parameters={"out": some_output})
and then the child flow will get it EDIT: These are not mutually exclusive. you can have both
Ultimately I’m starting to think the way you separated your main flow and child might be a bit off lol. Because the logic between them seems coupled. I hope that made sense
k

Kelly Huang

02/22/2022, 8:52 PM
yeah it's definitely a bit off but whatever haha
thanks 🙂
ah one more thing: I can't seem to import get_task_run_result or create_flow_run
Copy code
from prefect.tasks.prefect.flow_run import
I tried but it doesn't exist
k

Kevin Kho

02/22/2022, 9:00 PM
I think it should be
from prefect.tasks.prefect import get_task_run_result
. It’s the same spot as
create_flow_run
k

Kelly Huang

02/22/2022, 9:11 PM
I also cannot import
create_flow_run
. I use
from prefect.tasks.prefect import StartFlowRun
ohh is it in prefect 0.15? I'm on 0.14 and I can't change it. is there an alternative to get_task_run_result?
5 Views