Hi, A question around best practices when setting...
# prefect-community
t
Hi, A question around best practices when setting up flows... In one scenario I have a flow which will typically run several steps. Some of those steps are used by other processes. To keep the Flows 'DRY' I have created three separate flows so I can call them in isolation. I have then created a parent flow which can run all three child flows. This works fine with wait_for_flow_run/set_upstream and allows the parent flow to give a view of the child flows and their progress/logs. I wonder if this is the best approach as despite it functioning well I lose observability. I've enabled stream_logs so the parent Flow sees all of the child logging, but the names aren't very intuitive and the cloud doesn't easily allow you to navigate to the child flows. See attached screenshot. You end up with a long list of 'create_flow_run'/'wait_for_flow_run' instead of the friendly task names. I can rename the create_flow_run however the new name only shows on the child flow instance, not on the parent. Because of this it feels I am going against the grain with Prefect. So am I barking up the wrong tree by keeping Flows DRY - should I instead create multiple flows and only worry about keeping the Tasks DRY? (they already are so this would be easy to change)
a
One option might be to wrap sequences of tasks that are reused by different flows into Python functions (not Prefect tasks), with such a function accepting an upstream task to link upstream, and returning the final task to be linked downstream. Then your separate flows can call these functions to reuse these task sequences, and it will be as if you wrote these tasks into the flows directly.
upvote 2
a
Regarding flow and task run names in the UI, here is how you can influence this for more observability in the UI:
Copy code
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

with Flow("parent_flow") as flow:
    child_flow_run_id = create_flow_run(
        flow_name="child_flow_name",
        run_name="your_custom_flow_run_name_for_the_child_flow_run",
        task_args={"name": "custom_task_name"},
    )
    child_flowrunview = wait_for_flow_run(
        child_flow_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args={"name": "custom_task_name"},
    )
You are definitely not doing anything wrong. We even have a name for this approach called “The Orchestrator Pattern” to handle parent-child-flow relationship in the orchestration and we fully support it.
t
Thank you both for the feedback, very helpful!
👍 1