https://prefect.io logo
#prefect-community
Title
# prefect-community
f

Fina Silva-Santisteban

04/18/2022, 8:54 PM
Hi everyone! I’ve created a flow-of-flows which runs another flow in a loop, something like this:
Copy code
with Flow('Parent Flow') as flow:
    
    dates = ['2021-01-04', '2021-01-05', '2021-01-06'] // a list of dates

    for i in range(len(dates)):
       //run child flow with i as parameter
When I check the Prefect UI it seems like the child flows are running in parallel (I’m using threads) which is in general great but in this case I’d like the child flows to be run sequentially. Is there a way to force the parent flow to run them that way?? 🤔
k

Kevin Kho

04/18/2022, 8:55 PM
You can so with a LocalExecutor or you can do something like this:
Copy code
with Flow('Parent Flow') as flow:
    
    dates = ['2021-01-04', '2021-01-05', '2021-01-06'] // a list of dates
    child_flows = []
    for i in range(len(dates)):
       x = create_flow_run(...)
       child_flows.append(x)
    for i in range(1,len(dates)):
       child_flows[i].set_upsteam(child_flows[i-1])
🌟 1
🙏 1
f

Fina Silva-Santisteban

04/18/2022, 10:11 PM
@Kevin Kho
set_upsteam()
is what I needed! Thanks so much!
@Kevin Kho Your example snippet above worked out perfectly fine, thank you for that! I’m kinda stuck with the next thing though: I don’t want to use a hard coded list of dates as shown in the snippet, but a task that creates the list using the given flow parameter, something like this:
Copy code
with Flow('Parent Flow') as flow:
    year = Parameter('year')
    dates = create_dates_list(year) // still a list of dates, but this is now a task.

    child_flows = []
    for i in range(len(dates)):
       x = create_flow_run(...)
       child_flows.append(x)
    for i in range(1,len(dates)):
       child_flows[i].set_upsteam(child_flows[i-1])
When I try that it throws the error
len(dates): FunctionTask doesn't have the attribute len
. Is there a way to get the value of the task instead of the reference to the task instance? Something else I’ve tried: making the task
create_dates_list
a regular python function, so just removing the task decorator. But then it throws an error because the function’s argument value looks like
<Parameter year>
, so it’s the parameter instance and not the value of the Parameter. Is there a way to get the value of a Parameter inside a function that is not a task? I hope this makes sense! 🙏
k

Kevin Kho

04/19/2022, 9:26 PM
You can’t loop over a dynamic range of tasks because the loop is constructing the DAG, but the DAG can’t be constructed if the number of items is not defined ahead of time. If you have a dynamic set of dates, then you can’t use
for
and you have to use
mapping
instead by default. If you use mapping, then you lose the sequential execution of creating the flow run by time. You need to use task looping instead to run them in a loop. The thing is if
creates_dates_list
is a fucntion, it executes during flow registration time, but then the Parameter value doesn’t exist until flow run time which is why you get that weird looking
Paramter year
. Tasks are the mechanism to defer execution to flow run time.
🌟 1
🤩 1
f

Fina Silva-Santisteban

04/19/2022, 9:27 PM
@Kevin Kho ohh interesting!! I’ll read up on that! Thanks so much!
14 Views