https://prefect.io logo
k

Kevin Mullins

02/09/2022, 9:53 PM
I’m starting to use
create_flow_run
,
wait_for_flow_run
, and
map
to fan-out via sub-flows. Is there any way to get more friendly task names using these tasks so I can more easily identify what child task is still running? Right now I have to go into the parent flow, look under logs for the URL for the created child flow run and then navigate separately. Ideally (maybe in Orion some day) it would be nice if the lineage between parent and child flows could be visualized; however, I realize this may be difficult thing to do. I’m just trying to find ways to make it as user friendly for an engineer to go in and understand which sub-flows failed or are running/etc.
In this example I have the parent flow discover dynamic databases to replicate, and then run the sub flow with params for each database.
z

Zanie

02/09/2022, 9:56 PM
Orion captures lineage in the UI already 🙂 let me see if I have a suggestion for these tasks though..
k

Kevin Mullins

02/09/2022, 9:56 PM
@Zanie - Are you saying it already can visualize the lineage between flows and sub-flows? In the current Prefect all you see is lineage to
create_flow_run.
z

Zanie

02/09/2022, 9:58 PM
Yes there’s a whole “subflow runs” tab and clicking the task that creates the subflow run will bring you to its page.
k

Kevin Mullins

02/09/2022, 9:58 PM
That’s awesome, exactly what I was hoping for!
z

Zanie

02/09/2022, 9:58 PM
(clicking the task in the dag)
k

Kevin Mullins

02/09/2022, 9:59 PM
For current purposes before Orion is ready. I was hoping there was a way to name mapped tasks based on a param or something. I’ve been able to do this before with completely custom tasks but didn’t know how it would interact with the existing
create_flow_run
and
wait_for_flow_run
tasks.
z

Zanie

02/09/2022, 10:01 PM
Maybe something like…
create_flow_run = create_flow_run.copy(task_run_name="{flow_name}-{map_index}")
Or
create_flow_run(…, task_args={"task_run_name": "…"})
k

Kevin Mullins

02/09/2022, 10:05 PM
Interesting. I’ll dig in more tomorrow and see if I can figure out an approach. I think it makes more sense to me for simple tasks, but not sure how mapping plays into it. Here is my outer flow that is doing the mapping:
Copy code
with flow_builder.build_flow(
    __name__, OUTER_FLOW_NAME, executor=flow_builder.EXECUTOR_LOCAL_DASK
) as discover_flow:
    sub_flow_params = discover_databases()
    flow_run_ids = create_flow_run.map(
        parameters=sub_flow_params,
        flow_name=unmapped(INNER_FLOW_NAME),
        project_name=unmapped(PROJECT_NAME),
    )

    wait_for_flow_run.map(flow_run_ids, raise_final_state=unmapped(True))
z

Zanie

02/09/2022, 10:06 PM
Should work fine with mapping.
👍 1
k

Kevin Mullins

02/09/2022, 10:09 PM
Thanks, I’ll report back if you want what I find out tomorrow.
Not sure if this is the best way to go about it; however, this seems to accomplish what I was looking for of providing context for both the create run flows and wait for run flows. The wait for run is most important as it is what fails if the sub-flow fails:
Copy code
import os
import random
from typing import Dict, List, Any

import prefect
from faker import Faker
from prefect import Flow, task, unmapped
from prefect.backend import FlowRunView
from prefect.core import Parameter
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


@task
def say_hello(sub_flow_desc: str) -> None:
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"hello from subflow: {sub_flow_desc}")
    if random.randint(0, 10) > 5:
        raise ValueError("fake error")


with Flow("example_sub_flow") as sub_flow:
    sub_flow_desc = Parameter("sub_flow_desc")
    say_hello(sub_flow_desc)


@task
def calculate_sub_flows() -> List[Dict[str, Any]]:
    fake = Faker()
    return [{"sub_flow_desc": fake.name()} for _ in range(10)]


@task
def create_sub_flow(**kwargs) -> Dict[str, str]:
    desc = kwargs["parameters"]["sub_flow_desc"]
    result = create_flow_run.run(**kwargs)
    return {"flow_run_id": result, "sub_flow_desc": desc}


@task
def wait_for_sub_flow(sub_flow: Dict[str, str], **kwargs: Any) -> FlowRunView:
    kwargs.pop("sub_flow_runs", None)
    flow_run_id = sub_flow["flow_run_id"]
    return wait_for_flow_run.run(flow_run_id=flow_run_id, **kwargs)


with Flow("example_parent_flow") as parent_flow:
    sub_flow_params = calculate_sub_flows()

    sub_flow_runs = create_sub_flow.map(
        parameters=sub_flow_params,
        flow_name=unmapped("example_sub_flow"),
        project_name=unmapped("examples"),
        task_args={
            "task_run_name": lambda **kw: f"run subflow: {kw['parameters']['sub_flow_desc']}"
        },
    )

    wait_for_sub_flow.map(
        sub_flow_runs,
        sub_flow_runs=sub_flow_runs,
        raise_final_state=unmapped(True),
        task_args={
            "task_run_name": lambda **kw: f"wait for subflow: {kw['sub_flow_runs']['sub_flow_desc']}"
        },
    )
89 Views