I thought I got the gist of subflows, but as I rea...
# prefect-community
m
I thought I got the gist of subflows, but as I read the docs (concepts specifically: https://orion-docs.prefect.io/concepts/flows/#subflows) I am starting to think I am wrong again. So the use case is the following:
Copy code
@task
def for_loop_B(category):
  for item_i in category.items:
     load_item(item_i)
     store_item(item_i)

@flow
def for_loop_A()
  categories = retrieve_all_categories()
  for category_i in categories:
    for_loop_B(category_i)
Doing the above seemed wrong, because also (by default at least) tasks or flows would fail just because one internal step failed. Additionally, the processing of the categories is fully independent. So I thought,
for_loop_B
should also be a flow, and
load_item
and
store_item
should be tasks for example. However, I got confused because of what the docs mention: “Unlike tasks, subflows will block until completion with all task runners.“. I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen? Also, can tasks be nested within tasks for that matter?
a
Tasks cannot have tasks, but it can have custom non-task functions being called inside
I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen?
You could try those two options: 1. the single category processing can be a subflow with SequentialTaskRunner, and then your main flow can run subflows concurrently 2. You could call your categories with concurrent task runner, and each category can have no for loop, only calling things that need to be processed sequentially one by one (only with a for loop Prefect would try to run things in parallel/concurrently)
m
So since in my case the subflows are dynamic, would I write a flow with a for-loop, maintain a `parallel_subflows`list variable and add the subflows to there, and then after the for-loop do the
*await* asyncio.gather(*parallel_subflows)
? And, currently I also still need to put
async
in front of the subflows then correct?
a
correct, for now running subflows in parallel only work with async
correct, for now running subflows in parallel only works with async
m
Actually, I realize I am being a little silly here, but so now I want to then double check:
Copy code
@task
def for_loop_B(category):
  for item_i in category.items:
     load_item(item_i)
     store_item(item_i)

@flow(task_runner=ConcurrentTaskRunner())
def for_loop_A()
  categories = retrieve_all_categories()
  for category_i in categories:
    for_loop_B(category_i)
Does the above not actually achieve what I want? (It is so easy if it is, that’s why I am calling myself silly xD, and is that also what you meant with your second bulletpoint?) Because, in the above case, the flow runs the tasks (i.e. categories) concurrently, but for category, the steps are executed sequentially then? Or what is the default behavior for code inside a task, because I could not find that anywhere. Because I guess I confused myself as well with the TaskRunner and FlowRunner stuff, thinking TaskRunner would be specified on the task for some silly reason.
Hi @Anna Geller, I was wondering if the above was correct what I said?
a
there is no parallelism within a task itself - the task runner interface can execute task runs concurrently but this is something that you can configure on a flow level for tasks called within that flow - does it answer your question @Michiel Verburg?
• flow runner decides how to deploy/execute your flow • task runner decides how to execute your tasks
m
Yeah that answers my question indeed, thanks!
🙌 1