I’m starting to use `create_flow_run`, `wait_for_f...
# prefect-community
k
I’m starting to use
create_flow_run
,
wait_for_flow_run
, and
map
to fan-out via sub-flows. Is there any way to get more friendly task names using these tasks so I can more easily identify what child task is still running? Right now I have to go into the parent flow, look under logs for the URL for the created child flow run and then navigate separately. Ideally (maybe in Orion some day) it would be nice if the lineage between parent and child flows could be visualized; however, I realize this may be difficult thing to do. I’m just trying to find ways to make it as user friendly for an engineer to go in and understand which sub-flows failed or are running/etc.
In this example I have the parent flow discover dynamic databases to replicate, and then run the sub flow with params for each database.
z
Orion captures lineage in the UI already 🙂 let me see if I have a suggestion for these tasks though..
k
@Zanie - Are you saying it already can visualize the lineage between flows and sub-flows? In the current Prefect all you see is lineage to
create_flow_run.
z
Yes there’s a whole “subflow runs” tab and clicking the task that creates the subflow run will bring you to its page.
k
That’s awesome, exactly what I was hoping for!
z
(clicking the task in the dag)
k
For current purposes before Orion is ready. I was hoping there was a way to name mapped tasks based on a param or something. I’ve been able to do this before with completely custom tasks but didn’t know how it would interact with the existing
create_flow_run
and
wait_for_flow_run
tasks.
z
Maybe something like…
create_flow_run = create_flow_run.copy(task_run_name="{flow_name}-{map_index}")
Or
create_flow_run(…, task_args={"task_run_name": "…"})
k
Interesting. I’ll dig in more tomorrow and see if I can figure out an approach. I think it makes more sense to me for simple tasks, but not sure how mapping plays into it. Here is my outer flow that is doing the mapping:
Copy code
with flow_builder.build_flow(
    __name__, OUTER_FLOW_NAME, executor=flow_builder.EXECUTOR_LOCAL_DASK
) as discover_flow:
    sub_flow_params = discover_databases()
    flow_run_ids = create_flow_run.map(
        parameters=sub_flow_params,
        flow_name=unmapped(INNER_FLOW_NAME),
        project_name=unmapped(PROJECT_NAME),
    )

    wait_for_flow_run.map(flow_run_ids, raise_final_state=unmapped(True))
z
Should work fine with mapping.
👍 1
k
Thanks, I’ll report back if you want what I find out tomorrow.
Not sure if this is the best way to go about it; however, this seems to accomplish what I was looking for of providing context for both the create run flows and wait for run flows. The wait for run is most important as it is what fails if the sub-flow fails:
Copy code
import os
import random
from typing import Dict, List, Any

import prefect
from faker import Faker
from prefect import Flow, task, unmapped
from prefect.backend import FlowRunView
from prefect.core import Parameter
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


@task
def say_hello(sub_flow_desc: str) -> None:
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"hello from subflow: {sub_flow_desc}")
    if random.randint(0, 10) > 5:
        raise ValueError("fake error")


with Flow("example_sub_flow") as sub_flow:
    sub_flow_desc = Parameter("sub_flow_desc")
    say_hello(sub_flow_desc)


@task
def calculate_sub_flows() -> List[Dict[str, Any]]:
    fake = Faker()
    return [{"sub_flow_desc": fake.name()} for _ in range(10)]


@task
def create_sub_flow(**kwargs) -> Dict[str, str]:
    desc = kwargs["parameters"]["sub_flow_desc"]
    result = create_flow_run.run(**kwargs)
    return {"flow_run_id": result, "sub_flow_desc": desc}


@task
def wait_for_sub_flow(sub_flow: Dict[str, str], **kwargs: Any) -> FlowRunView:
    kwargs.pop("sub_flow_runs", None)
    flow_run_id = sub_flow["flow_run_id"]
    return wait_for_flow_run.run(flow_run_id=flow_run_id, **kwargs)


with Flow("example_parent_flow") as parent_flow:
    sub_flow_params = calculate_sub_flows()

    sub_flow_runs = create_sub_flow.map(
        parameters=sub_flow_params,
        flow_name=unmapped("example_sub_flow"),
        project_name=unmapped("examples"),
        task_args={
            "task_run_name": lambda **kw: f"run subflow: {kw['parameters']['sub_flow_desc']}"
        },
    )

    wait_for_sub_flow.map(
        sub_flow_runs,
        sub_flow_runs=sub_flow_runs,
        raise_final_state=unmapped(True),
        task_args={
            "task_run_name": lambda **kw: f"wait for subflow: {kw['sub_flow_runs']['sub_flow_desc']}"
        },
    )
248 Views