https://prefect.io logo
s

Sque

10/30/2020, 11:04 AM
Hey folks, I have just started diving into prefect and I am trying to migrate some code from luigi One of the problems I am facing is that we used the dynamic dependency system of luigi, so a task could
yield
other tasks as dependents. I was looking for similar functionality and the closest I found is the
LOOP
but the problem is that this permits only one direct dependency while I need to create multiple. I also tried to represent the problem on the flow level but unfortunately it does not work in my case. The task dependencies depend on the value of parameters so this is not known until a flow is executed. Any ideas here what is the prefect way to tackle this?
d

Dylan

10/30/2020, 2:23 PM
Hi @Sque! Welcome to the Prefect Community 😄 You’re looking for Mapping: https://docs.prefect.io/core/concepts/mapping.html
s

Sque

10/30/2020, 5:04 PM
Hi @Dylan! Thanks for finding this amazing community here:) I checked on the mapping but do not understand exactly how it will help me here. So putting this as code it could be something like this if the flow was dynamic e.g.
Copy code
@task
def process_date(date: datetime):
    # .. do an action on this date

with Flow('dynamic') as flow:
    date = DateTimeParameter('date')
    sub_tasks = [process_date(sub_date) 
                 for sub_date in get_childs_for_this_date(date)]
    main_task = process_date(date, upstream_tasks=sub_tasks)
So the problem with the above is that
get_childs_for_this_date
returns different number of dates depending on the value. I could unfold the whole problem inside
process_date
and process all sub_dates and main date under one task. But I loose all the parallelism here and this process takes a lot of time. Ideally, I would like this flow to be scheduled every X and given the current time pass this as parameter
So, I am not sure how mapping could work here 😕
I think I am more looking something like LOOP but to support multiple
d

Dylan

10/30/2020, 5:16 PM
hmmm
I’ll get back to you in just a moment
Hey hey
So in this example
DateTimeParameter
is passed/evaluated when the Flow Run is scheduled
as either a default or an input for that specific flow run
So, to generate a list of sub dates
Actually
Where do
sub_dates
come from in your example?
On a more high-level note: what’s your objective from a 10,000 foot view?
Copy code
from prefect import task, Flow, DateTimeParameter

@task
def process_date(date: datetime):
    # .. do an action on this date


with Flow('dynamic') as flow:
    date = DateTimeParameter('date')
    list_of_dates = get_childs_for_this_date(date)
    result = process_date.map(sub_date)
That’s my first pass at what I think your flow could look like
But, I’ll make two notes: 1. Prefect doesn’t have a strict reliance on time/schedule. Flows can occur ad hoc or be scheduled for arbitrary times 2. Tasks within a flow occur as fast as their dependencies allow
So within a flow, if you need a particular thing to happen at a particular time, you might want to look at Flows that schedule other Flows
Where you’d have one flow that processes a particular date and schedules another flow to do things at a particular time
That might look something like