https://prefect.io logo
Title
p

Paulo Benatto

01/29/2022, 11:21 AM
Good morning 🙂 I'm quite new in prefect and I would like an advice for a problem that I have. In the example below all numbers (the list) have to be processed before saving the number (save_number()). I would like to know how I can process one number per time. For the first number in the list i execute the whole flow, for the second number i execute the whole flow, and so on ... This is an simple example, I'm thinking more if i have to process a lot of data. Many Thanks
from prefect import Flow, task
import prefect

import time

from prefect.executors import LocalDaskExecutor


logger = prefect.context.get("logger")


@task
def add_ten(x):
    time.sleep(1)
    return x + 10


@task
def save_number(y):
    <http://logger.info|logger.info>(f"Saving number: {y}")


with Flow("reduce", executor=LocalDaskExecutor()) as flow:
    mapped_result = add_ten.map([1, 2, 3, 4, 5, 6, 7, 8, 9, 9])
    mapped_result_2 = add_ten.map(mapped_result)
    reduced_result = save_number(mapped_result_2)
a

Anna Geller

01/29/2022, 11:52 AM
So mapping in general is designed to run things in parallel, but if you use the default LocalExecutor instead of LocalDaskExecutor, then everything will run sequentially.
with Flow("reduce", executor=LocalExecutor()) as flow:
If I understood the problem correctly, you want to have the “save_number” step be executed individually for each mapped input (here: number). In that case, you can simply move this save (or in data processing probably load) step to the last mapped function which will have the effect that even if you run things in parallel, the save/load step will be executed once per mapped input.
Alternatively, you can do iterated mapping for this load/save step as well like so (no reduce step required):
@task
def save_number(y):
    <http://logger.info|logger.info>(f"Saving number: {y}")


with Flow("reduce", executor=LocalDaskExecutor()) as flow:
    mapped_result = add_ten.map([1, 2, 3, 4, 5, 6, 7, 8, 9, 9])
    mapped_result_2 = add_ten.map(mapped_result)
    save_number.map(mapped_result_2)
p

Paulo Benatto

01/29/2022, 12:10 PM
@Anna Geller humm many thanks for your answer. I have tested the example here and worked. 🙇 and probably i have a bug in my project also :ladybug:
👍 1