Chu
07/25/2022, 2:54 PMparams = dict(
orgs=['org1','org2','org3], # should be mapping
start_date="2022-03-19", # unmapped
end_date="2022-03-20" #unmapped
)
with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
flow_name=unmapped("map flow"),
project_name=unmapped("myProject"),
parmameters = params # Am I doing it right?
)
or it should be like this? just wondering if there is an option letting me only update orgs each time, and keep unmapped variable fixed
params = dict(
orgs=['org1','org2','org3], # should be mapping
start_date=["2022-03-19","2022-03-19","2022-03-19"],
end_date=["2022-03-20","2022-03-20","2022-03-20"]
)
with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
flow_name=unmapped("map flow"),
project_name=unmapped("myProject"),
parmameters = params
)
Anna Geller
07/25/2022, 3:31 PMparams = [dict(
orgs=['org1','org2','org3], # should be mapping
start_date="2022-03-19", # unmapped
end_date="2022-03-20" #unmapped
),
dict(
orgs=['org1','org2','org3], # should be mapping
start_date="2022-03-19", # unmapped
end_date="2022-03-20" #unmapped
),
dict(
orgs=['org1','org2','org3], # should be mapping
start_date="2022-03-19", # unmapped
end_date="2022-03-20" #unmapped
),
...]
with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
flow_name=unmapped("map flow"),
project_name=unmapped("myProject"),
parmameters = params # Am I doing it right?
)
Chu
07/25/2022, 3:38 PMparams = dict(
orgs=['org1','org2','org3], # should be mapping
start_date=["2022-03-19","2022-03-19","2022-03-19"],
end_date=["2022-03-20","2022-03-20","2022-03-20"]
)
with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
flow_name=unmapped("map flow"),
project_name=unmapped("myProject"),
parmameters = params
)
Anna Geller
07/25/2022, 3:39 PMChu
07/25/2022, 3:43 PMAnna Geller
07/25/2022, 3:53 PMChu
07/25/2022, 3:56 PM@ task ('run dbt job')
def dbt_run(org_id, start_date, end_date):
return DBTShellTask ()
with Flow("map flow") as flow:
org_id = Parameter("organization_id")
strat_date = Parameter("strat_date")
end_date = Parameter("end_date")
dbt_run(org_id, strat_date, end_date)
Anna Geller
07/25/2022, 4:04 PMChu
07/25/2022, 4:37 PM# we define those params outside and overwrite here
organization_ids=['id1','id2','id3']
data_processing_start_date = "2022-03-19 +0000"
data_processing_end_date = "2022-03-20 +0000"
# get params according to length of org_ids
def get_params():
params = []
for i in range(len(organization_ids)):
org_dict = {}
org_dict['organization_id'] = organization_ids[i]
org_dict['data_processing_start_date'] = data_processing_start_date
org_dict['data_processing_end_date'] = data_processing_end_date
params.append(org_dict)
return params
# retrieve mapping params
params = get_params()
# orchestrator
with Flow("orchestrator") as flow:
mapped_BI_model_run_ids = create_flow_run.map(
flow_name=unmapped("xxx"),
project_name=unmapped("xxx"),
parmameters = params
)
wait_for_BI_flow_run.map(flow_run_id=mapped_BI_model_run_ids, raise_final_state=unmapped(True), stream_logs=unmapped(True))
Anna Geller
07/26/2022, 12:47 PMsometimes we may add more orgs to that listin that case, I would store it in some data store (DB, S3, Redis, even Prefect KV Store if you use Cloud) and then retrieve it from there - this way, you don't have to reregister and redeploy flows when only some parameter values change glad you solved it!
Chu
07/26/2022, 2:57 PMparams = get_params()
inside the Flow or outside the Flow from registeration perspective and best practice.
Really thank you for answering my question, our team is really astonished by the functionaility of Prefect and how supportive the community is!!!Anna Geller
07/26/2022, 3:24 PMChu
07/26/2022, 6:14 PMmapped_BI_model_run_ids = create_flow_run.map(
flow_name=unmapped("xxx"),
project_name=unmapped("xxx"),
upstream_tasks=unmapped([wait_for_another_mapped_flow]),
task_args=unmapped(dict(trigger=all_successful)),
parmameters = params
)
Thanks for help!Anna Geller
07/27/2022, 7:24 AMChu
07/27/2022, 11:53 AMAnna Geller
07/27/2022, 12:08 PMChu
07/27/2022, 5:54 PM@task()
def giant_task(params1, params2):
create_flow_run_...
wait_for_flow_run_...
create_flow_run_...(trigger=all_finished, parameters = params1)
wait_for_flow_run_...
create_flow_run_...(trigger=all_successful, parameters = params2)
wait_for_flow_run_...
with Flow(...) as flows:
output = giant_task.map(params1, params2)
Could not infer an active Flow context while creating edge to <Task: wait_for_flow_run>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `wait_for_flow_run.run(...)`
Anna Geller
07/27/2022, 7:05 PMChu
07/27/2022, 7:14 PMMansour Zayer
07/27/2022, 7:45 PMwith Flow(...) as flow:
create_flow_run_1.map(input_list)
wait_for_flow_run_1.map(input_list)
create_flow_run_2.map(input_list)
wait_for_flow_run_2.map(input_list)
create_flow_run_3.map(input_list)
wait_for_flow_run_3.map(input_list)
The DaskExecutor runs the first flow_run for n workers for the first n elements of the list, then runs the second flow_run for n workers etc.
We want all flow_runs running the first element. That's why we're trying to wrap the create_flow_runs in a task. How do you propose we achieve this goal? (If my description isn't clear, plz let me know)Anna Geller
07/28/2022, 12:37 AMfrom prefect import Flow, task, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
@task
def generate_thousand_numbers(start, stop, step):
nrs = range(start, stop, step)
return list(nrs)
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
flow_run_1 = generate_thousand_numbers(1, 1000, 1)
flow_run_2 = generate_thousand_numbers(1000, 2000, 1)
flow_run_3 = generate_thousand_numbers(2000, 3000, 1)
flow_run_4 = generate_thousand_numbers(3000, 4000, 1)
flow_run_5 = generate_thousand_numbers(4000, 5000, 1)
# ... until 8
parameters = [
dict(list_of_numbers=flow_run_1),
dict(list_of_numbers=flow_run_2),
dict(list_of_numbers=flow_run_3),
dict(list_of_numbers=flow_run_4),
dict(list_of_numbers=flow_run_5),
# ... until 8
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-child-flow"),
project_name=unmapped("community"),
)