Hi experts, I am working on using startFlowRun to ...
# ask-community
l
Hi experts, I am working on using startFlowRun to allow parent/child relationship flows. I need to pass the result from one child flow to another like the following example :
Copy code
@task
def get_parameters():
	.....
	return {}
	
flow_a = StartFlowRun(flow_name="flow_a", project_name="sample", wait=True)
flow_b = StartFlowRun(flow_name="flow_b", project_name="sample", wait=True) 



with Flow("parent-flow") as flow:
    date_to_process = Parameter("date_to_process", default=None)
    vp = get_parameters(date_to_process)   

    
    flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
    flow_b = flow_b(upstream_tasks=[flow_a])
I was able to do these successfully; but I am NOT sure how I can use the parameter in the dependent flows. In this case, I have a get_parameters() task to compose the parameters that I need to pass down to dependent flow_a. How do I receive/use the parameter I passed in from
flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
? Let's just say, I want to print out the parameters I pass into flow_a. What do I need to do in flow_a.py to reference the passed in parameter?
k
Is something like this what you’re looking for?
Copy code
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun

@task
def task_a():
  return "hello"

task_b = StartFlowRun(flow_name="flow_b", project_name="examples", wait=True)
task_c = StartFlowRun(flow_name="flow_c", project_name="examples", wait=True)

with Flow("flow_a") as flow:
  result_a = task_a()
  result_b = task_b(upstream_tasks=[result_a], parameters={'result_a': result_a})
  result_c = task_c(upstream_tasks=[result_b], parameters={'result_b': result_b.result})
or are you trying to use the same parameter in
flow_a
and
flow_b
?
Just a friendly reminder also if you could post your code block in the thread next time just to not cover the other messages
πŸ‘ 1
l
I am trying to use the same parameter for the child flows
My key question/confusion is how a child flow is getting the passed down parameters
k
Ah ok i misunderstood the first. Let me work on an example
Copy code
import prefect
from prefect import Flow, task
from prefect import Parameter
from prefect.tasks.prefect.flow_run import StartFlowRun

def get_parameters():
    return {'x': 1, 'y': 2}

@task()
def task_2(n):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"hello {n}")
    return f"hello {n}"

with Flow("flow_b") as flow_b:
    x = Parameter("x", default="test")
    task_2(x)

with Flow("flow_c") as flow_c:
    y = Parameter("y", default="test")
    task_2(y)

# Register subflows
flow_b.register(project_name="subflow")
flow_c.register(project_name="subflow")

# StartFlowRun calls
task_b = StartFlowRun(flow_name="flow_b", project_name="subflow", wait = True)
task_c = StartFlowRun(flow_name="flow_c", project_name="subflow", wait = True)

with Flow("flow_a") as flow:
    params = get_parameters() # {"x": 1, "y": 2}
    result_b = task_b(upstream_tasks=[params], parameters=params)
    result_c = task_c(upstream_tasks=[result_b], parameters=params)

flow.run()
in this example, the parameters are used in flow_b and flow_c
l
Hi @Kevin Kho thanks for the example. Can you explain to me how, by creating Parameter("x", default='test') the subflow become aware of the the x value you were passing into the subflow? When you use
parameters=params
, it only pass in a dictionary or a list.
k
Thinking about one Flow (no subflows yet), we can specify the Parameter with that syntax and then use it by passing it to Tasks inside the Flow. The Parameter can be set inside the
flow.run
call. This can be seen here: https://docs.prefect.io/core/concepts/parameters.html
The parameter can also be set through the UI. Now in the case of the
StartFlowRun
task, it basically calls
flow.run
. We can pass the parameters to the
StartFlowRun
task, which hands it over to
flow.run
.
flow_b can be run above with
flow_b.run(parameters={'x':1})
Does that clarify it?
By putting the
Parameter("x", default='test')
, that subflow can now handle that parameter being passed in during runtime. (this behavior is applicable to all flows, not just subflows)
Parameter
is a special Task
l
πŸ™Œ perfectπŸ™Œ
I think flow.run(parameters) was the missing link I was not aware of before
k
happy to help πŸ™‚
πŸ™ 1