Chu
07/21/2022, 7:11 PMKevin Kho
create_flow_run.map()
and pass in the list of org ids there. Create flow run can take in a parameter so you can map over parameter valuesChu
07/21/2022, 9:22 PMChu
07/21/2022, 9:40 PM@task
def dbt_run(param_org, param_start_date, param_end_date):
…
with Flow (executor=DaskExecutor()) as flow:
param_org = Parameter("org_id")
param_start_date = Parameter("start_date")
param_end_date=Parameter("end_date")
dbt run (param_org, param_start_date, param_end_date).map()
flow.run(
parameters=dict(
org_id = [org_1, org_2, org_3, … org_n],
start_date = "2022-07-21",
end_date = "2022-07-22",
)
)
Kevin Kho
Kevin Kho
dbt_run.map(param_org, param_start_date, param_end_date)
yes it will be parallel if you use DaskExecutor or LocalDAskExecutorChu
07/21/2022, 10:00 PMChu
07/21/2022, 10:01 PMChu
07/21/2022, 10:04 PMdbt_run.map(param_org=[org1,org2,org3], param_start_date=unmapped(a date), param_end_date=unmapped(a date))
like this?Kevin Kho
Chu
07/22/2022, 3:02 PMwith Flow(name="xxx", executor=LocalDaskExecutor(scheduler="threads", num_workers=8)) as flow:
org_id_param = Parameter("org_id")
start_date_param = Parameter("start_date", default="2012-01-01 +0000")
end_date_param = Parameter("end_date", default="2012-01-03 +0000")
dbt_run.map(
org_id = org_id_param,
start_date = unmapped(start_date_param),
end_date = unmapped(end_date_param),
)
flow.run(
parameters = dict(
org_id=["org_1","org_2","org_3"],
start_date="2022-01-17 +0000",
end_date="2022-01-18 +0000"
)
)
Chu
07/22/2022, 3:41 PMKevin Kho
Chu
07/22/2022, 3:50 PMKevin Kho
Kevin Kho
Chu
07/22/2022, 4:10 PMexecutor=LocalDaskExecutor(scheduler="threads", num_workers=8)
for my flow, it will only run for one org_id, I dont know why. Even though the log says those two orgs have been ran.
if I did not specify that, all orgs will be ran proplerly. Is there something I miss? how can I fix that. Thanks!!Kevin Kho
Chu
07/22/2022, 4:15 PMKevin Kho
Chu
07/22/2022, 4:44 PMAn attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if _name_ == '_main_':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Kevin Kho
if __name__ == "__main__":
flow.run()
Chu
07/22/2022, 4:53 PMKevin Kho
Chu
07/22/2022, 5:15 PM@task(name="dbt run")
def dbt_run(org_id, start_date, end_date):
command = (
f"dbt run --vars '{{org_id: {org_id},"
f" start_date: {start_date},"
f" end_date: {end_date}}}'"
f" --select path/to/my/model"
)
return subprocess.run(
command,
check=True,
stderr=True,
stdout=True,
shell=True,
cwd="myDbtProject/",
)
Chu
07/22/2022, 5:16 PMKevin Kho
Kevin Kho
Chu
07/22/2022, 5:34 PMKevin Kho
Chu
07/22/2022, 5:37 PMKevin Kho
Chu
07/22/2022, 5:40 PMChu
07/22/2022, 5:43 PMKevin Kho
Chu
07/22/2022, 5:46 PMChu
07/22/2022, 5:48 PMKevin Kho
Chu
07/22/2022, 5:53 PMChu
07/22/2022, 7:46 PM