Thread
#prefect-community
    Jacob Blanco

    Jacob Blanco

    2 years ago
    I would like to run a given task with an array of dates:
    @task
    def generate_dates(start_date, n_days):
        return [start_date - datetime.timedelta(days=x) for x in range(n_days)]
    
    @task
    def do_something(updated_at):
        ## DO SOME STUFF WITH THE DATE
    
    with Flow("My Flow") as flow:
        start_date = Parameter('start_date')
        n_days = Parameter('n_days')
    
        dates = generate_dates(start_date, n_days)
    
        for run_date in dates:
            do_something(run_date)
    
    flow.run(dict(start_date=, n_days= 10))
    In this case I don't want to use map for some technical reasons. I could just implement the loop inside of the task but I like having all the timing tracked by Prefect Cloud.
    emre

    emre

    2 years ago
    Your code currently tries to generate a
    do_something
    task for each element in build time. However, the task
    generate_dates
    hasn’t run yet, and if you do a type check, you would see that
    dates
    is not a list (also you are not returning anything from
    generate dates
    , just a heads up.) In prefect, I’d like to think of 2 stages in a flows lifetime: the build time and the run time. The build time happens within the
    with Flow()...
    statement, and initializes (not runs) tasks that make up the flow and connects them based on their input output dependencies. The run time happens at
    flow.run()
    , which actually runs the tasks that are initialized in build time. Therefore, your code is trying to generate multiple build tasks in build time, using data that will be available at run time, which isn’t possible. If
    dates
    was known beforehand (calculated out of the flow), you could do this, but I don’t think you want that either. I don’t see an option out of mapping.
    Jacob Blanco

    Jacob Blanco

    2 years ago
    Thanks @emre the code was meant to be more for illustrative purposes, but what you say makes sense. They do have the concept of task looping but it's more of a recursion which I guess I could setup but that seems really strange (get list of dates, run with one, remove it from list, pass it to the next execution)
    emre

    emre

    2 years ago
    No worries! The expected way is to use mapping, looping would sacrifice parallelism between tasks in the same loop. you could maybe find more help if you elaborate on your concerns with mapping.
    Jacob Blanco

    Jacob Blanco

    2 years ago
    I want to update a singular table in PostgreSQL and I am trying to do that for a subset of the records at a time to avoid locking up the entire table for a long time since it's a live environment. I could write the loop inside the task but it would be nice to have the statistics for each subset separately.
    emre

    emre

    2 years ago
    I understand, thats actually a nice use case. I checked the source code a little in hopes of a concurrency limit between tasks of the same map, but couldn’t. It would be a nice addition, if it doesn’t exist. Meanwhile, you could try some of these: • Use just enough concurrency on the dask cluster, say if 5 concurrent postgres writes locks the table, use nthreads=4 • Use map sort of like a worker pool. Say you have a list of 400 data batches, i.e. 400 mapped tasks to run, and 4 concurrent writes works nice. Split your list into 4 lists, each having 100 batches. Map over the list of lists, each task receiving 100 batches, and use task looping for each batch.
    Also found this: https://docs.prefect.io/orchestration/concepts/task-concurrency-limiting.html In case you are using prefect cloud, they got you covered.
    Jacob Blanco

    Jacob Blanco

    2 years ago
    Thanks that's a really good idea!