Octopus
07/11/2022, 2:14 PMfrom prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import StartFlowRun
import time
from datetime import timedelta
# @task
# def wait_and_succeed(ref, action_id):
# time.sleep(10)
# print(f"children task success for ref {ref} and action {action_id}")
# if action_id == "write":
# print(f"[SUCCESS] {ref} Second level reached !!!")
# if action_id == "delete":
# print(f"[SUCCESS] {ref} Third level reached !!!")
@task
def call_children_flow(ref):
print(f"{ref} ref")
actions_id = ["read","write","delete"]
for action_id in actions_id:
start_flow_run = StartFlowRun(flow_name="Generic Children flow")
print(f"start_flow_run {start_flow_run}")
child_id = start_flow_run.run(parameters={
"reference": ref,
"action_id": action_id,
}, project_name="Playground")
wait_for_flow_run.run(child_id)
@task
def run_action(action_id, ref):
start_flow_run = StartFlowRun(flow_name="Generic Children flow")
print(f"start_flow_run {start_flow_run}")
child_id = start_flow_run.run(parameters={
"reference": ref,
"action_id": action_id,
}, project_name="Playground")
return child_id
with Flow("Generic Parent flow") as parent_flow:
fake_refs = ["ref1", "ref2", "ref3"]
call_children_flow.map(fake_refs)
if __name__ == "__main__":
parent_flow.register(
project_name="Playground"
)
parent_flow.executor = LocalDaskExecutor(num_workers=20)
parent_flow.run()
Kevin Kho
07/11/2022, 2:26 PMflow.executor = LocalDaskExecutor(num_workers=9)
Octopus
07/11/2022, 2:32 PMKevin Kho
07/11/2022, 2:44 PMOctopus
07/11/2022, 2:51 PMKevin Kho
07/11/2022, 2:53 PMwait_for_flow_run
inside the for loop so won’t it be limited to running one at a time inside the subflow?Octopus
07/11/2022, 2:56 PMKevin Kho
07/11/2022, 3:02 PMOctopus
07/11/2022, 5:50 PMKevin Kho
07/11/2022, 6:05 PMOctopus
07/11/2022, 6:37 PMKevin Kho
07/11/2022, 7:15 PM