Jelle Vegter
05/04/2022, 11:52 AMAnna Geller
from datetime import timedelta
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
PARENT_FLOW_NAME = "parent_flow_example"
PREFECT_PROJECT_NAME = "community"
with Flow(PARENT_FLOW_NAME) as parent_flow:
flow_a_run_id = create_flow_run(
flow_name="Flow_A",
project_name=PREFECT_PROJECT_NAME,
task_args=dict(name="Flow A"),
)
flow_a_flowrunview = wait_for_flow_run(
flow_a_run_id,
raise_final_state=True,
stream_logs=True,
task_args=dict(name="Wait for Flow A"),
)
flow_b_run_id = create_flow_run(
flow_name="Flow_B",
project_name=PREFECT_PROJECT_NAME,
upstream_tasks=[flow_a_flowrunview],
task_args=dict(name="Flow B"),
)
flow_b_flowrunview = wait_for_flow_run(
flow_b_run_id,
raise_final_state=True,
stream_logs=True,
task_args=dict(name="Wait for Flow B"),
)
flow_c_run_id = create_flow_run(
flow_name="Flow_C",
project_name=PREFECT_PROJECT_NAME,
upstream_tasks=[flow_b_flowrunview],
task_args=dict(name="Flow C"),
)
flow_c_flowrunview = wait_for_flow_run(
flow_c_run_id,
raise_final_state=True,
stream_logs=True,
task_args=dict(name="Wait for Flow C"),
)
if __name__ == "__main__":
parent_flow.visualize()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by