Kevin Mullins
02/09/2022, 9:53 PMcreate_flow_run
, wait_for_flow_run
, and map
to fan-out via sub-flows. Is there any way to get more friendly task names using these tasks so I can more easily identify what child task is still running? Right now I have to go into the parent flow, look under logs for the URL for the created child flow run and then navigate separately.
Ideally (maybe in Orion some day) it would be nice if the lineage between parent and child flows could be visualized; however, I realize this may be difficult thing to do. I’m just trying to find ways to make it as user friendly for an engineer to go in and understand which sub-flows failed or are running/etc.Zanie
Kevin Mullins
02/09/2022, 9:56 PMcreate_flow_run.
Zanie
Kevin Mullins
02/09/2022, 9:58 PMZanie
Kevin Mullins
02/09/2022, 9:59 PMcreate_flow_run
and wait_for_flow_run
tasks.Zanie
create_flow_run = create_flow_run.copy(task_run_name="{flow_name}-{map_index}")
create_flow_run(…, task_args={"task_run_name": "…"})
Kevin Mullins
02/09/2022, 10:05 PMwith flow_builder.build_flow(
__name__, OUTER_FLOW_NAME, executor=flow_builder.EXECUTOR_LOCAL_DASK
) as discover_flow:
sub_flow_params = discover_databases()
flow_run_ids = create_flow_run.map(
parameters=sub_flow_params,
flow_name=unmapped(INNER_FLOW_NAME),
project_name=unmapped(PROJECT_NAME),
)
wait_for_flow_run.map(flow_run_ids, raise_final_state=unmapped(True))
Zanie
Kevin Mullins
02/09/2022, 10:09 PMimport os
import random
from typing import Dict, List, Any
import prefect
from faker import Faker
from prefect import Flow, task, unmapped
from prefect.backend import FlowRunView
from prefect.core import Parameter
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
@task
def say_hello(sub_flow_desc: str) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f"hello from subflow: {sub_flow_desc}")
if random.randint(0, 10) > 5:
raise ValueError("fake error")
with Flow("example_sub_flow") as sub_flow:
sub_flow_desc = Parameter("sub_flow_desc")
say_hello(sub_flow_desc)
@task
def calculate_sub_flows() -> List[Dict[str, Any]]:
fake = Faker()
return [{"sub_flow_desc": fake.name()} for _ in range(10)]
@task
def create_sub_flow(**kwargs) -> Dict[str, str]:
desc = kwargs["parameters"]["sub_flow_desc"]
result = create_flow_run.run(**kwargs)
return {"flow_run_id": result, "sub_flow_desc": desc}
@task
def wait_for_sub_flow(sub_flow: Dict[str, str], **kwargs: Any) -> FlowRunView:
kwargs.pop("sub_flow_runs", None)
flow_run_id = sub_flow["flow_run_id"]
return wait_for_flow_run.run(flow_run_id=flow_run_id, **kwargs)
with Flow("example_parent_flow") as parent_flow:
sub_flow_params = calculate_sub_flows()
sub_flow_runs = create_sub_flow.map(
parameters=sub_flow_params,
flow_name=unmapped("example_sub_flow"),
project_name=unmapped("examples"),
task_args={
"task_run_name": lambda **kw: f"run subflow: {kw['parameters']['sub_flow_desc']}"
},
)
wait_for_sub_flow.map(
sub_flow_runs,
sub_flow_runs=sub_flow_runs,
raise_final_state=unmapped(True),
task_args={
"task_run_name": lambda **kw: f"wait for subflow: {kw['sub_flow_runs']['sub_flow_desc']}"
},
)