Jacob Blanco
06/30/2020, 5:51 AM@task
def generate_dates(start_date, n_days):
return [start_date - datetime.timedelta(days=x) for x in range(n_days)]
@task
def do_something(updated_at):
## DO SOME STUFF WITH THE DATE
with Flow("My Flow") as flow:
start_date = Parameter('start_date')
n_days = Parameter('n_days')
dates = generate_dates(start_date, n_days)
for run_date in dates:
do_something(run_date)
flow.run(dict(start_date=, n_days= 10))
In this case I don't want to use map for some technical reasons. I could just implement the loop inside of the task but I like having all the timing tracked by Prefect Cloud.emre
06/30/2020, 6:47 AMdo_something
task for each element in build time. However, the task generate_dates
hasn’t run yet, and if you do a type check, you would see that dates
is not a list (also you are not returning anything from generate dates
, just a heads up.)
In prefect, I’d like to think of 2 stages in a flows lifetime: the build time and the run time.
The build time happens within the with Flow()...
statement, and initializes (not runs) tasks that make up the flow and connects them based on their input output dependencies.
The run time happens at flow.run()
, which actually runs the tasks that are initialized in build time.
Therefore, your code is trying to generate multiple build tasks in build time, using data that will be available at run time, which isn’t possible. If dates
was known beforehand (calculated out of the flow), you could do this, but I don’t think you want that either. I don’t see an option out of mapping.Jacob Blanco
06/30/2020, 7:48 AMemre
06/30/2020, 8:53 AMJacob Blanco
07/01/2020, 4:01 AMemre
07/01/2020, 7:06 AMJacob Blanco
07/02/2020, 9:46 AM