[v1.2.1] Hi I would like to run multiple sub flows from task (e.g for each ref I would like to run e...
o

Octopus

almost 3 years ago
[v1.2.1] Hi I would like to run multiple sub flows from task (e.g for each ref I would like to run each action "read", "write","delete") . I have the parent flow who'll have the basic data and the child flow who'll execute an action on a ref. With my code I can only trigger 3 subflows / 9 . I think its because I use the StartFlowRun (I get the same behavior with create_flow_run ) because if I call a task instead of startflowrun I get my 9 subflow executed.
from prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor

from prefect.tasks.prefect import StartFlowRun

import time
from datetime import timedelta

# @task
# def wait_and_succeed(ref, action_id):
#     time.sleep(10)

#     print(f"children task success for ref {ref} and action {action_id}")

#     if action_id == "write":
#         print(f"[SUCCESS] {ref} Second level reached !!!")
#     if action_id == "delete":
#         print(f"[SUCCESS] {ref} Third level reached !!!")

@task
def call_children_flow(ref):
    print(f"{ref} ref")

    actions_id = ["read","write","delete"]

    for action_id in actions_id:
        start_flow_run = StartFlowRun(flow_name="Generic Children flow")
        print(f"start_flow_run {start_flow_run}")

        child_id = start_flow_run.run(parameters={
                "reference": ref,
                "action_id": action_id,
        }, project_name="Playground")

        wait_for_flow_run.run(child_id)


@task
def run_action(action_id, ref):
    start_flow_run = StartFlowRun(flow_name="Generic Children flow")

    print(f"start_flow_run {start_flow_run}")

    child_id = start_flow_run.run(parameters={
            "reference": ref,
            "action_id": action_id,
    }, project_name="Playground")

    return child_id


with Flow("Generic Parent flow") as parent_flow:
    fake_refs = ["ref1", "ref2", "ref3"]
    call_children_flow.map(fake_refs)

if __name__ == "__main__":
    parent_flow.register(
        project_name="Playground"
    )

    parent_flow.executor = LocalDaskExecutor(num_workers=20)

    parent_flow.run()