Thread
#prefect-community
    m

    Michiel Verburg

    3 months ago
    I thought I got the gist of subflows, but as I read the docs (concepts specifically: https://orion-docs.prefect.io/concepts/flows/#subflows) I am starting to think I am wrong again. So the use case is the following:
    @task
    def for_loop_B(category):
      for item_i in category.items:
         load_item(item_i)
         store_item(item_i)
    
    @flow
    def for_loop_A()
      categories = retrieve_all_categories()
      for category_i in categories:
        for_loop_B(category_i)
    Doing the above seemed wrong, because also (by default at least) tasks or flows would fail just because one internal step failed. Additionally, the processing of the categories is fully independent. So I thought,
    for_loop_B
    should also be a flow, and
    load_item
    and
    store_item
    should be tasks for example. However, I got confused because of what the docs mention: “Unlike tasks, subflows will block until completion with all task runners.“. I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen? Also, can tasks be nested within tasks for that matter?
    Anna Geller

    Anna Geller

    3 months ago
    Tasks cannot have tasks, but it can have custom non-task functions being called inside
    I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen?
    You could try those two options:1. the single category processing can be a subflow with SequentialTaskRunner, and then your main flow can run subflows concurrently 2. You could call your categories with concurrent task runner, and each category can have no for loop, only calling things that need to be processed sequentially one by one (only with a for loop Prefect would try to run things in parallel/concurrently)
    m

    Michiel Verburg

    3 months ago
    So since in my case the subflows are dynamic, would I write a flow with a for-loop, maintain a parallel_subflowslist variable and add the subflows to there, and then after the for-loop do the
    *await* asyncio.gather(*parallel_subflows)
    ? And, currently I also still need to put
    async
    in front of the subflows then correct?
    Anna Geller

    Anna Geller

    3 months ago
    correct, for now running subflows in parallel only work with async
    correct, for now running subflows in parallel only works with async
    m

    Michiel Verburg

    3 months ago
    Actually, I realize I am being a little silly here, but so now I want to then double check:
    @task
    def for_loop_B(category):
      for item_i in category.items:
         load_item(item_i)
         store_item(item_i)
    
    @flow(task_runner=ConcurrentTaskRunner())
    def for_loop_A()
      categories = retrieve_all_categories()
      for category_i in categories:
        for_loop_B(category_i)
    Does the above not actually achieve what I want? (It is so easy if it is, that’s why I am calling myself silly xD, and is that also what you meant with your second bulletpoint?) Because, in the above case, the flow runs the tasks (i.e. categories) concurrently, but for category, the steps are executed sequentially then? Or what is the default behavior for code inside a task, because I could not find that anywhere. Because I guess I confused myself as well with the TaskRunner and FlowRunner stuff, thinking TaskRunner would be specified on the task for some silly reason.
    Hi @Anna Geller, I was wondering if the above was correct what I said?
    Anna Geller

    Anna Geller

    2 months ago
    there is no parallelism within a task itself - the task runner interface can execute task runs concurrently but this is something that you can configure on a flow level for tasks called within that flow - does it answer your question @Michiel Verburg?
    • flow runner decides how to deploy/execute your flow • task runner decides how to execute your tasks
    m

    Michiel Verburg

    2 months ago
    Yeah that answers my question indeed, thanks!