Sque
10/30/2020, 11:04 AMyield
other tasks as dependents.
I was looking for similar functionality and the closest I found is the LOOP
but the problem is that this permits only one direct dependency while I need to create multiple.
I also tried to represent the problem on the flow level but unfortunately it does not work in my case. The task dependencies depend on the value of parameters so this is not known until a flow is executed.
Any ideas here what is the prefect way to tackle this?Dylan
Sque
10/30/2020, 5:04 PM@task
def process_date(date: datetime):
# .. do an action on this date
with Flow('dynamic') as flow:
date = DateTimeParameter('date')
sub_tasks = [process_date(sub_date)
for sub_date in get_childs_for_this_date(date)]
main_task = process_date(date, upstream_tasks=sub_tasks)
So the problem with the above is that get_childs_for_this_date
returns different number of dates depending on the value. I could unfold the whole problem inside process_date
and process all sub_dates and main date under one task. But I loose all the parallelism here and this process takes a lot of time.
Ideally, I would like this flow to be scheduled every X and given the current time pass this as parameterDylan
DateTimeParameter
is passed/evaluated when the Flow Run is scheduledsub_dates
come from in your example?from prefect import task, Flow, DateTimeParameter
@task
def process_date(date: datetime):
# .. do an action on this date
with Flow('dynamic') as flow:
date = DateTimeParameter('date')
list_of_dates = get_childs_for_this_date(date)
result = process_date.map(sub_date)