Thread
#prefect-community
    Luis Gallegos

    Luis Gallegos

    1 year ago
    Hi, all, can you help me, i need a sequential execution on map method (not parallel, but serial), or something similar to achieve sequential looping over a list of params. Here's my code:
    from prefect import task, Flow, Parameter
    from prefect.executors import LocalDaskExecutor, DaskExecutor
    from prefect.tasks.prefect import StartFlowRun
    import prefect
    
    executor = LocalDaskExecutor(num_workers=1)
    
    
    flow1 = StartFlowRun("flow1", project_name='test', wait=True)
    flow2 = StartFlowRun("flow2", project_name='test', wait=True)
    
    with Flow("example", executor=executor) as flow:
    		
    	table_dict_param_list = []
    	with open('parameters.txt', 'r') as f:
    		lines = f.readlines()
    		for cnt, line in enumerate(lines):
    				dict_param = {}
    				dict_param['param1'] = cnt
    				dict_param['param2'] = line
    				table_dict_param_list.append(table_dict_param)
    	
    	flow1 = flow1()
    	
    	## i need this execution to be sequential like in a "for loop"	
    	flow2.map(parameters=table_dict_param_list)
    
    flow.register(project_name="test")
    Michael Adkins

    Michael Adkins

    1 year ago
    Basically, you'll want to define a helper task that calls
    flow2.run
    on a loop.
    Luis Gallegos

    Luis Gallegos

    1 year ago
    ok, thanks, now the loop works but only with the first iteration, because the flow2 result in SUCCESS Signal, my code as follow:
    from prefect import task, Flow, Parameter
    from prefect.executors import LocalDaskExecutor, DaskExecutor
    from prefect.tasks.prefect import StartFlowRun
    import prefect
    from prefect.engine.signals import LOOP
    
    executor = LocalDaskExecutor(num_workers=1)
    
    
    flow1 = StartFlowRun("flow1", project_name='test', wait=True)
    
    @task
    def flow2_loop(dict_param_list):
    
    	# we extract the accumulated task loop result from context
    	loop_payload = prefect.context.get("task_loop_result", {})
    
    	i = loop_payload.get("i", 0)
    
    	dict_param = dict_param_list[i]
    
    	flow2 = StartFlowRun("flow2", project_name='test', wait=True)
    	## this execution result in "success" and the loop ends with the first iteration
    	flow2.run(parameters=dict_param)
    
    	if i > len(dict_param_list):
    		return i  # return statements end the loop
    
    	raise LOOP(result=dict(i=i + 1))
    
    
    with Flow("example", executor=executor) as flow:
    		
    	dict_param_list = []
    	with open('parameters.txt', 'r') as f:
    		lines = f.readlines()
    		for cnt, line in enumerate(lines):
    				dict_param = {}
    				dict_param['param1'] = cnt
    				dict_param['param2'] = line
    				dict_param_list.append(dict_param)
    	
    	flow1 = flow1()
    	
    	## i need this execution to be sequential like in a "for loop"	
    	flow2_loop = flow2_loop(parameters=dict_param_list)
    
    flow.register(project_name="test")
    Michael Adkins

    Michael Adkins

    1 year ago
    Ah that's a tricky case haha since that's a task that raises a state. Since states in that case are raised as Python exceptions you might be able to get away with a
    try:
        flow2.run(...)
    except prefect.engine.signals.SUCCESS:
        pass
    This is a good time to consider how you want to handle sub-flows that do something other than succeed as well 🙂
    Luis Gallegos

    Luis Gallegos

    1 year ago
    Thanks @Michael Adkins it's working but with an
    idempotency_key
    , otherwise only the first iteration triggered a sub-flow creation/execution:
    flow2.run(parameters=dict_param, idempotency_key=str(i))
    Michael Adkins

    Michael Adkins

    1 year ago
    Ah yeah,
    StartFlowRun
    tasks automatically create an idempotency key using the task run id in the context to prevent you from doing this on accident. From the docstring:
    a unique idempotency key for scheduling the flow run. Duplicate flow runs with the same idempotency key will only create a single flow run. This is useful for ensuring that only one run is created if this task is retried. If not provided, defaults to the active `task_run_id`.