liren zhang
03/31/2021, 3:27 PM@task
def get_parameters():
.....
return {}
flow_a = StartFlowRun(flow_name="flow_a", project_name="sample", wait=True)
flow_b = StartFlowRun(flow_name="flow_b", project_name="sample", wait=True)
with Flow("parent-flow") as flow:
date_to_process = Parameter("date_to_process", default=None)
vp = get_parameters(date_to_process)
flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
flow_b = flow_b(upstream_tasks=[flow_a])
I was able to do these successfully; but I am NOT sure how I can use the parameter in the dependent flows. In this case, I have a get_parameters() task to compose the parameters that I need to pass down to dependent flow_a. How do I receive/use the parameter I passed in from flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
?
Let's just say, I want to print out the parameters I pass into flow_a. What do I need to do in flow_a.py to reference the passed in parameter?Kevin Kho
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
@task
def task_a():
return "hello"
task_b = StartFlowRun(flow_name="flow_b", project_name="examples", wait=True)
task_c = StartFlowRun(flow_name="flow_c", project_name="examples", wait=True)
with Flow("flow_a") as flow:
result_a = task_a()
result_b = task_b(upstream_tasks=[result_a], parameters={'result_a': result_a})
result_c = task_c(upstream_tasks=[result_b], parameters={'result_b': result_b.result})
Kevin Kho
flow_a
and flow_b
?Kevin Kho
liren zhang
03/31/2021, 5:58 PMliren zhang
03/31/2021, 6:00 PMKevin Kho
Kevin Kho
import prefect
from prefect import Flow, task
from prefect import Parameter
from prefect.tasks.prefect.flow_run import StartFlowRun
def get_parameters():
return {'x': 1, 'y': 2}
@task()
def task_2(n):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"hello {n}")
return f"hello {n}"
with Flow("flow_b") as flow_b:
x = Parameter("x", default="test")
task_2(x)
with Flow("flow_c") as flow_c:
y = Parameter("y", default="test")
task_2(y)
# Register subflows
flow_b.register(project_name="subflow")
flow_c.register(project_name="subflow")
# StartFlowRun calls
task_b = StartFlowRun(flow_name="flow_b", project_name="subflow", wait = True)
task_c = StartFlowRun(flow_name="flow_c", project_name="subflow", wait = True)
with Flow("flow_a") as flow:
params = get_parameters() # {"x": 1, "y": 2}
result_b = task_b(upstream_tasks=[params], parameters=params)
result_c = task_c(upstream_tasks=[result_b], parameters=params)
flow.run()
Kevin Kho
liren zhang
03/31/2021, 8:21 PMparameters=params
, it only pass in a dictionary or a list.Kevin Kho
flow.run
call. This can be seen here: https://docs.prefect.io/core/concepts/parameters.htmlKevin Kho
StartFlowRun
task, it basically calls flow.run
. We can pass the parameters to the StartFlowRun
task, which hands it over to flow.run
.Kevin Kho
flow_b.run(parameters={'x':1})
Kevin Kho
Kevin Kho
Parameter("x", default='test')
, that subflow can now handle that parameter being passed in during runtime. (this behavior is applicable to all flows, not just subflows)Kevin Kho
Parameter
is a special Taskliren zhang
03/31/2021, 8:40 PMliren zhang
03/31/2021, 8:41 PMKevin Kho