Chu
07/24/2022, 7:39 PMA -> B
, both A and B will take a list of same org_ids as parameters (need to use map function for parallel running)
when I use a parent flow to schedule flow A and B, can I use map function to pass parameters like this way? or is there a better way to do that?
code in the threadAnna Geller
07/24/2022, 8:38 PMChu
07/24/2022, 8:42 PMwith Flow (name="parent flow", schedule = CronSchedule("0 0 * * *"), executor=LocalDaskExecutor() ) as flow:
A_flow_id = create_flow_run(
flow_name="A",
project_name="sample project",
task_args= ...
)
wait_for_A_flow = wait_for_flow_run(
flow_run_id = A_flow_id
)
B_flow_id = create_flow_run(
flow_name = "B",
project_name = "sample project",
upstream_tasks = [wait_for_A_flow],
task_args= ...
)
wait_for_B_flow = wait_for_flow_run(
flow_run_id = B_flow_id
)
flow.run_config = KubernetesRun()
with Flow(name="A") as flow:
a_list_of_orgs_params = Parameter("a_list_of_orgs")
start_date_param = Parameter("start_date", default="2012-01-01 +0000")
end_date_param = Parameter("end_date", default="2012-01-03 +0000")
dbt_run.map(
org_id=a_list_of_orgs_params,
start_date=unmapped(start_date_param),
end_date=unmapped(end_date_param),
upstream_tasks = unmapped([some_task])
)
Anna Geller
07/25/2022, 3:29 PM