Hi, all, can you help me, i need a sequential exec...
# ask-community
l
Hi, all, can you help me, i need a sequential execution on map method (not parallel, but serial), or something similar to achieve sequential looping over a list of params. Here's my code:
Copy code
from prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.tasks.prefect import StartFlowRun
import prefect

executor = LocalDaskExecutor(num_workers=1)


flow1 = StartFlowRun("flow1", project_name='test', wait=True)
flow2 = StartFlowRun("flow2", project_name='test', wait=True)

with Flow("example", executor=executor) as flow:
		
	table_dict_param_list = []
	with open('parameters.txt', 'r') as f:
		lines = f.readlines()
		for cnt, line in enumerate(lines):
				dict_param = {}
				dict_param['param1'] = cnt
				dict_param['param2'] = line
				table_dict_param_list.append(table_dict_param)
	
	flow1 = flow1()
	
	## i need this execution to be sequential like in a "for loop"	
	flow2.map(parameters=table_dict_param_list)

flow.register(project_name="test")
z
Basically, you'll want to define a helper task that calls
flow2.run
on a loop.
l
ok, thanks, now the loop works but only with the first iteration, because the flow2 result in SUCCESS Signal, my code as follow:
Copy code
from prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.tasks.prefect import StartFlowRun
import prefect
from prefect.engine.signals import LOOP

executor = LocalDaskExecutor(num_workers=1)


flow1 = StartFlowRun("flow1", project_name='test', wait=True)

@task
def flow2_loop(dict_param_list):

	# we extract the accumulated task loop result from context
	loop_payload = prefect.context.get("task_loop_result", {})

	i = loop_payload.get("i", 0)

	dict_param = dict_param_list[i]

	flow2 = StartFlowRun("flow2", project_name='test', wait=True)
	## this execution result in "success" and the loop ends with the first iteration
	flow2.run(parameters=dict_param)

	if i > len(dict_param_list):
		return i  # return statements end the loop

	raise LOOP(result=dict(i=i + 1))


with Flow("example", executor=executor) as flow:
		
	dict_param_list = []
	with open('parameters.txt', 'r') as f:
		lines = f.readlines()
		for cnt, line in enumerate(lines):
				dict_param = {}
				dict_param['param1'] = cnt
				dict_param['param2'] = line
				dict_param_list.append(dict_param)
	
	flow1 = flow1()
	
	## i need this execution to be sequential like in a "for loop"	
	flow2_loop = flow2_loop(parameters=dict_param_list)

flow.register(project_name="test")
z
Ah that's a tricky case haha since that's a task that raises a state. Since states in that case are raised as Python exceptions you might be able to get away with a
Copy code
try:
    flow2.run(...)
except prefect.engine.signals.SUCCESS:
    pass
This is a good time to consider how you want to handle sub-flows that do something other than succeed as well 🙂
l
Thanks @Zanie it's working but with an
idempotency_key
, otherwise only the first iteration triggered a sub-flow creation/execution:
Copy code
flow2.run(parameters=dict_param, idempotency_key=str(i))
z
Ah yeah,
StartFlowRun
tasks automatically create an idempotency key using the task run id in the context to prevent you from doing this on accident. From the docstring:
Copy code
a unique idempotency key for scheduling the flow run. Duplicate flow runs with the same idempotency key will only create a single flow run. This is useful for ensuring that only one run is created if this task is retried. If not provided, defaults to the active `task_run_id`.
👍🏼 1