https://prefect.io logo
j

Jacob Blanco

06/30/2020, 5:51 AM
I would like to run a given task with an array of dates:
Copy code
@task
def generate_dates(start_date, n_days):
    return [start_date - datetime.timedelta(days=x) for x in range(n_days)]

@task
def do_something(updated_at):
    ## DO SOME STUFF WITH THE DATE

with Flow("My Flow") as flow:
    start_date = Parameter('start_date')
    n_days = Parameter('n_days')

    dates = generate_dates(start_date, n_days)

    for run_date in dates:
        do_something(run_date)

flow.run(dict(start_date=, n_days= 10))
In this case I don't want to use map for some technical reasons. I could just implement the loop inside of the task but I like having all the timing tracked by Prefect Cloud.
e

emre

06/30/2020, 6:47 AM
Your code currently tries to generate a
do_something
task for each element in build time. However, the task
generate_dates
hasn’t run yet, and if you do a type check, you would see that
dates
is not a list (also you are not returning anything from
generate dates
, just a heads up.) In prefect, I’d like to think of 2 stages in a flows lifetime: the build time and the run time. The build time happens within the
with Flow()...
statement, and initializes (not runs) tasks that make up the flow and connects them based on their input output dependencies. The run time happens at
flow.run()
, which actually runs the tasks that are initialized in build time. Therefore, your code is trying to generate multiple build tasks in build time, using data that will be available at run time, which isn’t possible. If
dates
was known beforehand (calculated out of the flow), you could do this, but I don’t think you want that either. I don’t see an option out of mapping.
j

Jacob Blanco

06/30/2020, 7:48 AM
Thanks @emre the code was meant to be more for illustrative purposes, but what you say makes sense. They do have the concept of task looping but it's more of a recursion which I guess I could setup but that seems really strange (get list of dates, run with one, remove it from list, pass it to the next execution)
e

emre

06/30/2020, 8:53 AM
No worries! The expected way is to use mapping, looping would sacrifice parallelism between tasks in the same loop. you could maybe find more help if you elaborate on your concerns with mapping.
j

Jacob Blanco

07/01/2020, 4:01 AM
I want to update a singular table in PostgreSQL and I am trying to do that for a subset of the records at a time to avoid locking up the entire table for a long time since it's a live environment. I could write the loop inside the task but it would be nice to have the statistics for each subset separately.
e

emre

07/01/2020, 7:06 AM
I understand, thats actually a nice use case. I checked the source code a little in hopes of a concurrency limit between tasks of the same map, but couldn’t. It would be a nice addition, if it doesn’t exist. Meanwhile, you could try some of these: • Use just enough concurrency on the dask cluster, say if 5 concurrent postgres writes locks the table, use nthreads=4 • Use map sort of like a worker pool. Say you have a list of 400 data batches, i.e. 400 mapped tasks to run, and 4 concurrent writes works nice. Split your list into 4 lists, each having 100 batches. Map over the list of lists, each task receiving 100 batches, and use task looping for each batch.
marvin 1
Also found this: https://docs.prefect.io/orchestration/concepts/task-concurrency-limiting.html In case you are using prefect cloud, they got you covered.
j

Jacob Blanco

07/02/2020, 9:46 AM
Thanks that's a really good idea!
2 Views