https://prefect.io logo
Title
a

Alex Furrier

07/20/2021, 10:06 PM
I'm trying to run a "flow of flows" following the documentation on dependent flows here. I'm trying to parametrize this flow and pass those down to the dependent flows. I can pass params to the first flow but when it tries to run the dependent flow with params they all come back as null and the sub flow fails.
flow_a = StartFlowRun(flow_name="flow a", project_name="my project", wait=True)
flow_b = StartFlowRun(flow_name="flow b", project_name="my project", wait=True)

with Flow('flow-of-flows') as flow_of_flows:
    param_a = Parameter('param_a', 'a')
    param_b = Parameter('param_b', 'b')

    flow_a(parameters={"param_a": param_a})
    flow_b(parameters={"param_b": param_b}, upstream_tasks=[flow_a])
In the UI if I attempt to run this flow with params like
{'param_a': 'parametrized_1', 'param_b':'parametrized_b'}
it will create a task running
flow_a
and fail. Viewing the initiated flow for
flow_a
it shows the parameters being
{'param_a': null}
.
flow_a
fails and
flow_b
fails before it even runs since it's dependent on
flow_a
and says a reference task failed. Is there a proper way to pass parameters between parent and child flows?
k

Kevin Kho

07/20/2021, 10:10 PM
Hey @Alex Furrier, this looks right to me. Can I see how you declared it in
flow a
in the flow block?
a

Alex Furrier

07/20/2021, 10:17 PM
Do you mean the code for where flow_a is defined? Like
@task
def task_a(a):
    return a
    
with Flow("flow a") as flow:
    param_a = Parameter("param_a", "a")

    a = task_a(param_a)
k

Kevin Kho

07/20/2021, 10:20 PM
Yep exactly. This all looks good to me. Maybe you can try
StartFlowRun(flow_name="flow a", project_name="my project", wait=True).run(parameters={"param_a": "param_a_value"})
without the Flow and see if the parameters gets injected? Are you running these with the default or setting them through the UI?
a

Alex Furrier

07/20/2021, 10:47 PM
The parameters seem to be passing down correctly. Not sure why they weren't before. I was getting errors tho b/c I specified upstream tasks wrong. Instead of
with Flow('flow-of-flows') as flow_of_flows:
    param_a = Parameter('param_a', 'a')
    param_b = Parameter('param_b', 'b')

    flow_a(parameters={"param_a": param_a})
    flow_b(parameters={"param_b": param_b}, upstream_tasks=[flow_a])
it should have been
with Flow('flow-of-flows') as flow_of_flows:
    param_a = Parameter('param_a', 'a')
    param_b = Parameter('param_b', 'b')

    result_a = flow_a(parameters={"param_a": param_a})
    flow_b(parameters={"param_b": param_b}, upstream_tasks=[result_a])
k

Kevin Kho

07/20/2021, 10:49 PM
Oh I see. Glad you got it working