Chu
07/21/2022, 7:11 PMKevin Kho
07/21/2022, 7:44 PMcreate_flow_run.map()
and pass in the list of org ids there. Create flow run can take in a parameter so you can map over parameter valuesChu
07/21/2022, 9:22 PM@task
def dbt_run(param_org, param_start_date, param_end_date):
…
with Flow (executor=DaskExecutor()) as flow:
param_org = Parameter("org_id")
param_start_date = Parameter("start_date")
param_end_date=Parameter("end_date")
dbt run (param_org, param_start_date, param_end_date).map()
flow.run(
parameters=dict(
org_id = [org_1, org_2, org_3, … org_n],
start_date = "2022-07-21",
end_date = "2022-07-22",
)
)
Kevin Kho
07/21/2022, 9:59 PMdbt_run.map(param_org, param_start_date, param_end_date)
yes it will be parallel if you use DaskExecutor or LocalDAskExecutorChu
07/21/2022, 10:00 PMdbt_run.map(param_org=[org1,org2,org3], param_start_date=unmapped(a date), param_end_date=unmapped(a date))
like this?Kevin Kho
07/21/2022, 10:08 PMChu
07/22/2022, 3:02 PMwith Flow(name="xxx", executor=LocalDaskExecutor(scheduler="threads", num_workers=8)) as flow:
org_id_param = Parameter("org_id")
start_date_param = Parameter("start_date", default="2012-01-01 +0000")
end_date_param = Parameter("end_date", default="2012-01-03 +0000")
dbt_run.map(
org_id = org_id_param,
start_date = unmapped(start_date_param),
end_date = unmapped(end_date_param),
)
flow.run(
parameters = dict(
org_id=["org_1","org_2","org_3"],
start_date="2022-01-17 +0000",
end_date="2022-01-18 +0000"
)
)
Kevin Kho
07/22/2022, 3:46 PMChu
07/22/2022, 3:50 PMKevin Kho
07/22/2022, 4:06 PMChu
07/22/2022, 4:10 PMexecutor=LocalDaskExecutor(scheduler="threads", num_workers=8)
for my flow, it will only run for one org_id, I dont know why. Even though the log says those two orgs have been ran.
if I did not specify that, all orgs will be ran proplerly. Is there something I miss? how can I fix that. Thanks!!Kevin Kho
07/22/2022, 4:13 PMChu
07/22/2022, 4:15 PMKevin Kho
07/22/2022, 4:26 PMChu
07/22/2022, 4:44 PMAn attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if _name_ == '_main_':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Kevin Kho
07/22/2022, 4:49 PMif __name__ == "__main__":
flow.run()
Chu
07/22/2022, 4:53 PMKevin Kho
07/22/2022, 5:09 PMChu
07/22/2022, 5:15 PM@task(name="dbt run")
def dbt_run(org_id, start_date, end_date):
command = (
f"dbt run --vars '{{org_id: {org_id},"
f" start_date: {start_date},"
f" end_date: {end_date}}}'"
f" --select path/to/my/model"
)
return subprocess.run(
command,
check=True,
stderr=True,
stdout=True,
shell=True,
cwd="myDbtProject/",
)
Kevin Kho
07/22/2022, 5:19 PMChu
07/22/2022, 5:34 PMKevin Kho
07/22/2022, 5:36 PMChu
07/22/2022, 5:37 PMKevin Kho
07/22/2022, 5:38 PMChu
07/22/2022, 5:40 PMKevin Kho
07/22/2022, 5:45 PMChu
07/22/2022, 5:46 PMKevin Kho
07/22/2022, 5:50 PMChu
07/22/2022, 5:53 PM