Michiel Verburg
06/17/2022, 12:53 PM@task
def for_loop_B(category):
for item_i in category.items:
load_item(item_i)
store_item(item_i)
@flow
def for_loop_A()
categories = retrieve_all_categories()
for category_i in categories:
for_loop_B(category_i)
Doing the above seemed wrong, because also (by default at least) tasks or flows would fail just because one internal step failed. Additionally, the processing of the categories is fully independent. So I thought, for_loop_B
should also be a flow, and load_item
and store_item
should be tasks for example. However, I got confused because of what the docs mention: “Unlike tasks, subflows will block until completion with all task runners.“.
I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen? Also, can tasks be nested within tasks for that matter?Anna Geller
06/17/2022, 1:21 PMI want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen?You could try those two options: 1. the single category processing can be a subflow with SequentialTaskRunner, and then your main flow can run subflows concurrently 2. You could call your categories with concurrent task runner, and each category can have no for loop, only calling things that need to be processed sequentially one by one (only with a for loop Prefect would try to run things in parallel/concurrently)
Michiel Verburg
06/17/2022, 1:57 PM*await* asyncio.gather(*parallel_subflows)
? And, currently I also still need to put async
in front of the subflows then correct?Anna Geller
06/17/2022, 7:21 PMMichiel Verburg
06/21/2022, 2:17 PM@task
def for_loop_B(category):
for item_i in category.items:
load_item(item_i)
store_item(item_i)
@flow(task_runner=ConcurrentTaskRunner())
def for_loop_A()
categories = retrieve_all_categories()
for category_i in categories:
for_loop_B(category_i)
Does the above not actually achieve what I want? (It is so easy if it is, that’s why I am calling myself silly xD, and is that also what you meant with your second bulletpoint?)
Because, in the above case, the flow runs the tasks (i.e. categories) concurrently, but for category, the steps are executed sequentially then? Or what is the default behavior for code inside a task, because I could not find that anywhere. Because I guess I confused myself as well with the TaskRunner and FlowRunner stuff, thinking TaskRunner would be specified on the task for some silly reason.Anna Geller
07/05/2022, 12:45 PMMichiel Verburg
07/05/2022, 1:20 PM