Vamsi Reddy
01/11/2022, 1:36 PM[module_name1, module_name2,....]
. we are using a for loop to iterate over the list and submit the steps and check the status. However prefect does not submit these steps in sequence/order of the list. How do I make sure that the order is maintained ? Below is a screenshot of the code:Anna Geller
Anna Geller
Vamsi Reddy
01/11/2022, 1:47 PMAnna Geller
from prefect import task
@task
def get_spe_module_list():
return ["module_a", "module_b"]
@task
def process_spe_module(module):
emr_cluster = spin_emr(env=env, version=version, no_workers=no_of_workers, source='SPE',
bucket_name=company_bucket)
emr_cluster_id = get_cluster_id(emr_cluster)
log_file_path = get_logfile_path(emr_cluster)
with Flow("SPE_flow", state_handlers=[flow_status_to_slack],
storage=S3(bucket="humanyze-prefect-flows", key=f"{env}/{version}/spe_flow")) as flow:
spe_module_list = get_spe_module_list()
process_spe_module.map(spe_module_list)
Anna Geller
from prefect import unmapped
...
process_spe_module.map(spe_module_list, unmapped(emr_cluster_id))
Vamsi Reddy
01/11/2022, 1:57 PMAnna Geller
Vamsi Reddy
01/11/2022, 1:59 PMVamsi Reddy
01/11/2022, 6:20 PMVamsi Reddy
01/11/2022, 6:34 PMVamsi Reddy
01/14/2022, 4:55 PMAnna Geller
from prefect import Flow, task
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
Mapped tasks will run sequentially in order if you use the default sequential LocalExecutor
. Only if you assign one of Dask executors, your mapped tasks will run in parallel.Vamsi Reddy
01/14/2022, 5:16 PMVamsi Reddy
01/14/2022, 5:17 PMAnna Geller
Vamsi Reddy
01/14/2022, 5:21 PMAnna Geller
LocalExecutor
.Anna Geller
Vamsi Reddy
01/14/2022, 5:25 PMVamsi Reddy
01/14/2022, 5:26 PMKevin Kho
Kevin Kho
Vamsi Reddy
01/14/2022, 5:35 PMKevin Kho
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
Anna Geller
Vamsi Reddy
01/14/2022, 8:45 PM