https://prefect.io logo
#prefect-community
Title
# prefect-community
o

Octopus

07/11/2022, 2:14 PM
[v1.2.1] Hi I would like to run multiple sub flows from task (e.g for each ref I would like to run each action "read", "write","delete") . I have the parent flow who'll have the basic data and the child flow who'll execute an action on a ref. With my code I can only trigger 3 subflows / 9 . I think its because I use the StartFlowRun (I get the same behavior with create_flow_run ) because if I call a task instead of startflowrun I get my 9 subflow executed.
Copy code
from prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor

from prefect.tasks.prefect import StartFlowRun

import time
from datetime import timedelta

# @task
# def wait_and_succeed(ref, action_id):
#     time.sleep(10)

#     print(f"children task success for ref {ref} and action {action_id}")

#     if action_id == "write":
#         print(f"[SUCCESS] {ref} Second level reached !!!")
#     if action_id == "delete":
#         print(f"[SUCCESS] {ref} Third level reached !!!")

@task
def call_children_flow(ref):
    print(f"{ref} ref")

    actions_id = ["read","write","delete"]

    for action_id in actions_id:
        start_flow_run = StartFlowRun(flow_name="Generic Children flow")
        print(f"start_flow_run {start_flow_run}")

        child_id = start_flow_run.run(parameters={
                "reference": ref,
                "action_id": action_id,
        }, project_name="Playground")

        wait_for_flow_run.run(child_id)


@task
def run_action(action_id, ref):
    start_flow_run = StartFlowRun(flow_name="Generic Children flow")

    print(f"start_flow_run {start_flow_run}")

    child_id = start_flow_run.run(parameters={
            "reference": ref,
            "action_id": action_id,
    }, project_name="Playground")

    return child_id


with Flow("Generic Parent flow") as parent_flow:
    fake_refs = ["ref1", "ref2", "ref3"]
    call_children_flow.map(fake_refs)

if __name__ == "__main__":
    parent_flow.register(
        project_name="Playground"
    )

    parent_flow.executor = LocalDaskExecutor(num_workers=20)

    parent_flow.run()
k

Kevin Kho

07/11/2022, 2:26 PM
Hey @Octopus, you can try increasing the number of threads?
Copy code
flow.executor = LocalDaskExecutor(num_workers=9)
Also, could you move the code to the thread when you get a chance to make the main channel cleaner?
o

Octopus

07/11/2022, 2:32 PM
I've increased the number of thread but I get the same behavior. Thx for helping btw
k

Kevin Kho

07/11/2022, 2:44 PM
That should be the only say to increase parallelization. This looks right though
Your real parent has 9 mapped values you are saying right?
o

Octopus

07/11/2022, 2:51 PM
The parent will have 3 mapped values(ref1, ref2,ref3) and each 3 mapped value should trigger 3 subflows(ref1, read | ref1 write | ref1 delete | ref2 read ...), so I should have 9 subflows triggered
k

Kevin Kho

07/11/2022, 2:53 PM
I see what you mean. But you have a for loop and you have a
wait_for_flow_run
inside the for loop so won’t it be limited to running one at a time inside the subflow?
o

Octopus

07/11/2022, 2:56 PM
The for action_id in actions_id: is used because I can call a task.run.map inside a task, and the wait for run idea is to wait until subflow completion. Yes it can be a bad implementation, but I want the parent flow to wait for his subflow completion
k

Kevin Kho

07/11/2022, 3:02 PM
Yes but if it has to wait, it can’t fire off a new subflow, so aren’t you limited to 3 concurrent runs?
o

Octopus

07/11/2022, 5:50 PM
No I'm not limited to 3 concurent runs / now I (guess) I see, so what my script does is that I only runs 3 threads for the ref 1 ref 2 and ref 3 and will never create new threads for subflows like ref1 write or ref 3 delete
Is there any way to see in the log when a thread is open it may help me
k

Kevin Kho

07/11/2022, 6:05 PM
Your understanding is right the parent map is the one that distributed to the thread. I don’t know of a way to a log for that event
o

Octopus

07/11/2022, 6:37 PM
The thing I don't get is that it works if I call a task in the parent flow like wait_and_succeed instead of StartFlowRun(subflow). The first is a classic task and the second is a Task Flow so calling a flow inside a task has a specific behavior
k

Kevin Kho

07/11/2022, 7:15 PM
That’s because map is the one that parallelizes but the for loop does not. You are trying a two stage mapping, which is not practical because the first level map should already occupy all of the resources. Dask actually blocks two stage mappings
💯 1
82 Views