Hi, I'm trying to create a flow that is dependent ...
# ask-community
k
Hi, I'm trying to create a flow that is dependent on 2 other flows. Is it possible (using the imperative API) to add tasks to the final flow?
k
Could you show me some fake code snippet so I can get an idea of what you mean?
k
Copy code
flow 1 = StartFlowRun(
    flow_name= flow 1,
    project_name=project_name, wait=True)
flow 2 = StartFlowRun(
    flow_name=Flow 2,
    project_name=project_name, wait=True)
flow 3 = StartFlowRun(
    flow_name=Flow 3,
    project_name=project_name, wait=True)

with Flow('flow '4) as flow:
    PARAMS HERE

    first flow = flow 1(
        upstream_tasks=[recipient_email, funds_list,
                        company_sector, market_cap_lower_boundary,
                        market_cap_upper_boundary],
        parameters={PARAMS})
    second flow = flow 2(
        upstream_tasks=[first flow],
        parameters={PARAMS})
    third flow = flow 3(
        upstream_tasks=[second flow],
        parameters={PARAMS})
my flow looks something like this, but my concern is that flow 3 doesn't actually work on its own, so I'd like to avoid making flow 4
I want flow 3 to be the "flow of flows" but add more tasks to it
k
Ah I see so you can’t add it in the runtime because that would deviate from the registered DAG and the runtime execution needs to match that
k
ah so should I just have a registered flow that's unused?
k
I would suggest you either make a parameterizable path in Flow 3 that can fork and execute those added tasks or you add the tasks in Flow 4 pretty much
A bit confused. Just because you use
StartFlowRun
, doesn’t mean you can’t add more tasks to the “main” flow right?
k
no, I'm just confused where I do it
just in the with Flow block?
k
yes exactly
Copy code
with Flow('flow '4) as flow:
    PARAMS HERE

    first flow = flow 1(
        upstream_tasks=[recipient_email, funds_list,
                        company_sector, market_cap_lower_boundary,
                        market_cap_upper_boundary],
        parameters={PARAMS})
    x = some_other_task_here()
k
ah thank you!!
@Kevin Kho Am I able to reference upstream tasks from other flows? For example let's say in second flow, my upstream tasks includes a task that was run in the first flow. How would I reference that? I tried just putting the task into
upstream_tasks
, but the second flow complains that the task was called without its parameter, but I'm not trying to run the task again, I just want its result
k
You can’t because that Python object doesn’t exist in the subflow definition but isn’t it the order implied though that the upstream will execute first before you create the Flow run? Anyway, what you want is the
get_task_run_result
task to pull the task result from another Flow. Docs
k
@Kevin Kho thank you! Does
flow_run_id
refer to the
version group id
? and how do I find the
task_slug
?
Yes the order implied is correct, but I need to use the result of that upstream task as a param in the later flow. Does your advice using
get_task_run_result
still apply?
k
Yes it does. So you have two options. First is pass it as a Parameter to the subflow during
create_flow_run
so it’s given by the main Flow. The second is retrieve it with
get_task_run_result
. The easiest why to get a task slug is by printing
flow.serialize()['tasks']
and that should contain the slugs of the task then choose the one you want. It’s honestly hard to predict the output of the
slugify
algorithm something so printing is more straightforward
Print that in the main flow after the Flow definition and you should see it, or you can query the GraphQL API for the slugs of the registered Flow.
Ohhh I understand what you are saying. It’s hard to get the Flow run id downstream…I guess you have two options.
You can have the logic in the main flow:
Copy code
with Flow() as flow:
    x = create_flow_run()
    y = get_task_run_result(x, task_slug)
Or you can just do it from the main flow:
Copy code
with Flow() as flow:
    some_output = some_task()
    x = create_flow_run(..., parameters={"out": some_output})
and then the child flow will get it EDIT: These are not mutually exclusive. you can have both
Ultimately I’m starting to think the way you separated your main flow and child might be a bit off lol. Because the logic between them seems coupled. I hope that made sense
k
yeah it's definitely a bit off but whatever haha
thanks 🙂
ah one more thing: I can't seem to import get_task_run_result or create_flow_run
Copy code
from prefect.tasks.prefect.flow_run import
I tried but it doesn't exist
k
I think it should be
from prefect.tasks.prefect import get_task_run_result
. It’s the same spot as
create_flow_run
k
I also cannot import
create_flow_run
. I use
from prefect.tasks.prefect import StartFlowRun
ohh is it in prefect 0.15? I'm on 0.14 and I can't change it. is there an alternative to get_task_run_result?