https://prefect.io logo
k

Kyle

07/10/2023, 10:34 PM
@Marvin how to i run subflows sequentially where the subflows are filled with tasks that run concurrently. i need the second subflow to start AFTER the first subflow finishes.
m

Marvin

07/10/2023, 10:35 PM
Ah, the sequential subflows saga continues. It appears that Prefect does not have a built-in interface for running subflows sequentially. However, it seems you can achieve this by using the
SequentialTaskRunner
. This runner starts the subflows one after the other, and each subflow does not start until the previous one has completed. Here's an example from the Prefect discourse:
Copy code
python
13:24:17.121 | Beginning subflow run 'apricot-weasel' for flow 'subflow-1'...
13:24:17.121 | Starting task runner `SequentialTaskRunner`...
Subflow 1 started!
13:24:17.355 | Beginning subflow run 'masked-jackrabbit' for flow 'subflow-3'...
13:24:17.355 | Starting task runner `SequentialTaskRunner`...
Subflow 3 started!
13:24:18.190 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.314 | Subflow run 'apricot-weasel' finished in state Completed(message=None, type=COMPLETED)
13:24:18.337 | Subflow run 'spotted-coot' finished in state Completed(message=None, type=COMPLETED)
As you can see,
subflow-3
does not start until
subflow-1
has completed. This should allow you to run your subflows sequentially, with each subflow starting only after the previous one has finished. Remember, though, that while the subflows run sequentially, the tasks within each subflow can still run concurrently, depending on your task dependencies and executor configuration. I hope this helps. If not, well, I suppose that's just another day in the life of a human.
n

Nate

07/10/2023, 11:12 PM
hi @Kyle - i think you'd just want to call the subflows in order since there is no submit interface on flows at this time, those calls are blocking
Copy code
@flow
def foo():
   subflow1()
   subflow2()
   ...
and Marvin's answer above isn't quite right - task runners do not control the submission of subflows
k

Kyle

07/11/2023, 1:07 AM
i think this does it?
Copy code
@task
def task1(item: str):
    # make api call; save data
    return featherfilepath

@task 
def task2(item: str):
    # process stats; save data
    return featherfilepath

@flow(task_runner=ConcurrentTaskRunner()) # flowrun task1
def map_task1():
    items = feather_instance.get("items")
    items = items['name'].tolist()
    task1_multiple = task1.map(item=items)

@flow(task_runner=ConcurrentTaskRunner()) # flowrun task2
def map_task2():
    items = feather_instance.get("items")
    items = items['name'].tolist()
    task2_multiple = task2.map(item=items)

@flow(task_runner=SequentialTaskRunner())
def flowrun():
    map_task1() 
    map_task2() 

if __name__ == "__main__":
    flowrun()
@Nate ^
n

Nate

07/11/2023, 7:15 PM
3 things: • again, task runners do not control the submission of subflows. your subflows
map_task1
and
map_task2
are not tasks so the
SequentialTaskRunner
has no effect here • your
flowrun
happens to run the subflows sequentially because your subflow calls are blocking •
ConcurrentTaskRunner
is used by default