John-Craig Borman

    John-Craig Borman

    9 months ago
    Hi all, for testing purposes is there any way to configure .mapped tasks to run sequentially instead of in parallel?
    Anna Geller

    Anna Geller

    9 months ago
    yes, you can. Just attach a local executor to your flow and then it will run sequentially for each mapped child task, here is an example:
    from prefect import task, Flow
    from prefect.executors import LocalExecutor
    import time
    
    
    @task
    def generate_random_numbers():
        return list(range(1, 5))
    
    
    @task
    def add_one(x):
        time.sleep(3)
        return x + 1
    
    
    @task(log_stdout=True)
    def print_results(res):
        print(res)
    
    
    with Flow("mapping", executor=LocalExecutor()) as flow:
        numbers = generate_random_numbers()
        result = add_one.map(numbers)
        print_results(result)
    
    if __name__ == "__main__":
        flow.run()
    John-Craig Borman

    John-Craig Borman

    9 months ago
    Hmm, I am executing the flow with a
    LocalExecutor()
    Anna Geller

    Anna Geller

    9 months ago
    can you test the above example and confirm that it runs one after the other? For me it does. I believe you can use the same approach.
    output:
    [2021-12-21 17:12:23+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'mapping'
    [2021-12-21 17:12:23+0100] INFO - prefect.TaskRunner | Task 'generate_random_numbers': Starting task run...
    [2021-12-21 17:12:23+0100] INFO - prefect.TaskRunner | Task 'generate_random_numbers': Finished task run for task with final state: 'Success'
    [2021-12-21 17:12:23+0100] INFO - prefect.TaskRunner | Task 'add_one': Starting task run...
    [2021-12-21 17:12:23+0100] INFO - prefect.TaskRunner | Task 'add_one': Finished task run for task with final state: 'Mapped'
    [2021-12-21 17:12:23+0100] INFO - prefect.TaskRunner | Task 'add_one[0]': Starting task run...
    [2021-12-21 17:12:26+0100] INFO - prefect.TaskRunner | Task 'add_one[0]': Finished task run for task with final state: 'Success'
    [2021-12-21 17:12:26+0100] INFO - prefect.TaskRunner | Task 'add_one[1]': Starting task run...
    [2021-12-21 17:12:29+0100] INFO - prefect.TaskRunner | Task 'add_one[1]': Finished task run for task with final state: 'Success'
    [2021-12-21 17:12:29+0100] INFO - prefect.TaskRunner | Task 'add_one[2]': Starting task run...
    [2021-12-21 17:12:32+0100] INFO - prefect.TaskRunner | Task 'add_one[2]': Finished task run for task with final state: 'Success'
    [2021-12-21 17:12:32+0100] INFO - prefect.TaskRunner | Task 'add_one[3]': Starting task run...
    [2021-12-21 17:12:35+0100] INFO - prefect.TaskRunner | Task 'add_one[3]': Finished task run for task with final state: 'Success'
    [2021-12-21 17:12:35+0100] INFO - prefect.TaskRunner | Task 'print_results': Starting task run...
    [2021-12-21 17:12:35+0100] INFO - prefect.TaskRunner | [2, 3, 4, 5]
    [2021-12-21 17:12:35+0100] INFO - prefect.TaskRunner | Task 'print_results': Finished task run for task with final state: 'Success'
    [2021-12-21 17:12:35+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    John-Craig Borman

    John-Craig Borman

    9 months ago
    Must be an issue somewhere else in my flow, I am getting the same output as you here
    Thank you for the quick reply!