Vamsi Reddy
01/11/2022, 1:36 PM[module_name1, module_name2,....]. we are using a for loop to iterate over the list and submit the steps and check the status. However prefect does not submit these steps in sequence/order of the list. How do I make sure that the order is maintained ? Below is a screenshot of the code:Anna Geller
Anna Geller
Vamsi Reddy
01/11/2022, 1:47 PMAnna Geller
from prefect import task
@task
def get_spe_module_list():
return ["module_a", "module_b"]
@task
def process_spe_module(module):
emr_cluster = spin_emr(env=env, version=version, no_workers=no_of_workers, source='SPE',
bucket_name=company_bucket)
emr_cluster_id = get_cluster_id(emr_cluster)
log_file_path = get_logfile_path(emr_cluster)
with Flow("SPE_flow", state_handlers=[flow_status_to_slack],
storage=S3(bucket="humanyze-prefect-flows", key=f"{env}/{version}/spe_flow")) as flow:
spe_module_list = get_spe_module_list()
process_spe_module.map(spe_module_list)Anna Geller
from prefect import unmapped
...
process_spe_module.map(spe_module_list, unmapped(emr_cluster_id))Vamsi Reddy
01/11/2022, 1:57 PMAnna Geller
Vamsi Reddy
01/11/2022, 1:59 PMVamsi Reddy
01/11/2022, 6:20 PMVamsi Reddy
01/11/2022, 6:34 PMVamsi Reddy
01/14/2022, 4:55 PMAnna Geller
from prefect import Flow, task
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
Mapped tasks will run sequentially in order if you use the default sequential LocalExecutor. Only if you assign one of Dask executors, your mapped tasks will run in parallel.Vamsi Reddy
01/14/2022, 5:16 PMVamsi Reddy
01/14/2022, 5:17 PMAnna Geller
Vamsi Reddy
01/14/2022, 5:21 PMAnna Geller
LocalExecutor.Anna Geller
Vamsi Reddy
01/14/2022, 5:25 PMVamsi Reddy
01/14/2022, 5:26 PMKevin Kho
Kevin Kho
Vamsi Reddy
01/14/2022, 5:35 PMKevin Kho
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()Anna Geller
Vamsi Reddy
01/14/2022, 8:45 PM