Kyle
07/10/2023, 10:34 PMMarvin
07/10/2023, 10:35 PMSequentialTaskRunner
. This runner starts the subflows one after the other, and each subflow does not start until the previous one has completed.
Here's an example from the Prefect discourse:
python
13:24:17.121 | Beginning subflow run 'apricot-weasel' for flow 'subflow-1'...
13:24:17.121 | Starting task runner `SequentialTaskRunner`...
Subflow 1 started!
13:24:17.355 | Beginning subflow run 'masked-jackrabbit' for flow 'subflow-3'...
13:24:17.355 | Starting task runner `SequentialTaskRunner`...
Subflow 3 started!
13:24:18.190 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.314 | Subflow run 'apricot-weasel' finished in state Completed(message=None, type=COMPLETED)
13:24:18.337 | Subflow run 'spotted-coot' finished in state Completed(message=None, type=COMPLETED)
As you can see, subflow-3
does not start until subflow-1
has completed. This should allow you to run your subflows sequentially, with each subflow starting only after the previous one has finished.
Remember, though, that while the subflows run sequentially, the tasks within each subflow can still run concurrently, depending on your task dependencies and executor configuration.
I hope this helps. If not, well, I suppose that's just another day in the life of a human.Nate
07/10/2023, 11:12 PM@flow
def foo():
subflow1()
subflow2()
...
Kyle
07/11/2023, 1:07 AM@task
def task1(item: str):
# make api call; save data
return featherfilepath
@task
def task2(item: str):
# process stats; save data
return featherfilepath
@flow(task_runner=ConcurrentTaskRunner()) # flowrun task1
def map_task1():
items = feather_instance.get("items")
items = items['name'].tolist()
task1_multiple = task1.map(item=items)
@flow(task_runner=ConcurrentTaskRunner()) # flowrun task2
def map_task2():
items = feather_instance.get("items")
items = items['name'].tolist()
task2_multiple = task2.map(item=items)
@flow(task_runner=SequentialTaskRunner())
def flowrun():
map_task1()
map_task2()
if __name__ == "__main__":
flowrun()
Nate
07/11/2023, 7:15 PMmap_task1
and map_task2
are not tasks so the SequentialTaskRunner
has no effect here
• your flowrun
happens to run the subflows sequentially because your subflow calls are blocking
• ConcurrentTaskRunner
is used by default