Luis Gallegos
04/01/2021, 4:58 PMfrom prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.tasks.prefect import StartFlowRun
import prefect
executor = LocalDaskExecutor(num_workers=1)
flow1 = StartFlowRun("flow1", project_name='test', wait=True)
flow2 = StartFlowRun("flow2", project_name='test', wait=True)
with Flow("example", executor=executor) as flow:
table_dict_param_list = []
with open('parameters.txt', 'r') as f:
lines = f.readlines()
for cnt, line in enumerate(lines):
dict_param = {}
dict_param['param1'] = cnt
dict_param['param2'] = line
table_dict_param_list.append(table_dict_param)
flow1 = flow1()
## i need this execution to be sequential like in a "for loop"
flow2.map(parameters=table_dict_param_list)
flow.register(project_name="test")
Zanie
Zanie
flow2.run
on a loop.Luis Gallegos
04/01/2021, 5:32 PMfrom prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.tasks.prefect import StartFlowRun
import prefect
from prefect.engine.signals import LOOP
executor = LocalDaskExecutor(num_workers=1)
flow1 = StartFlowRun("flow1", project_name='test', wait=True)
@task
def flow2_loop(dict_param_list):
# we extract the accumulated task loop result from context
loop_payload = prefect.context.get("task_loop_result", {})
i = loop_payload.get("i", 0)
dict_param = dict_param_list[i]
flow2 = StartFlowRun("flow2", project_name='test', wait=True)
## this execution result in "success" and the loop ends with the first iteration
flow2.run(parameters=dict_param)
if i > len(dict_param_list):
return i # return statements end the loop
raise LOOP(result=dict(i=i + 1))
with Flow("example", executor=executor) as flow:
dict_param_list = []
with open('parameters.txt', 'r') as f:
lines = f.readlines()
for cnt, line in enumerate(lines):
dict_param = {}
dict_param['param1'] = cnt
dict_param['param2'] = line
dict_param_list.append(dict_param)
flow1 = flow1()
## i need this execution to be sequential like in a "for loop"
flow2_loop = flow2_loop(parameters=dict_param_list)
flow.register(project_name="test")
Zanie
try:
flow2.run(...)
except prefect.engine.signals.SUCCESS:
pass
Zanie
Luis Gallegos
04/01/2021, 6:03 PMidempotency_key
, otherwise only the first iteration triggered a sub-flow creation/execution:
flow2.run(parameters=dict_param, idempotency_key=str(i))
Zanie
StartFlowRun
tasks automatically create an idempotency key using the task run id in the context to prevent you from doing this on accident. From the docstring:
a unique idempotency key for scheduling the flow run. Duplicate flow runs with the same idempotency key will only create a single flow run. This is useful for ensuring that only one run is created if this task is retried. If not provided, defaults to the active `task_run_id`.