https://prefect.io logo
Title
j

Jon

12/09/2022, 7:37 PM
hey all, what are some best practices for debugging flow of flows in prefect 1 UI? tracing failure from parent to child flows are is arduous. also - does this get better with prefect 2 ui?
1
a

Anna Geller

12/09/2022, 8:54 PM
Yes it does! it's very easy to track that in v2
example
"""
prefect deployment build -n dev -q dev -a flows/11_parent_child/parent_child.py:extract
prefect deployment build -n dev -q dev -a flows/11_parent_child/parent_child.py:transform_load
prefect deployment build -n dev -q dev -a flows/11_parent_child/parent_child.py:cleanup
"""
from datetime import date
from prefect import flow


@flow(log_prints=True)
def extract(start_date: date = date(2022, 12, 1), end_date: date = date.today()) -> int:
    print(f"Running ingestion from {start_date} to {end_date} ✅")
    return 42


@flow(log_prints=True)
def transform_load(x: int = 42) -> None:
    print(f"Got data: {x} 📊")


@flow(log_prints=True)
def cleanup() -> None:
    print("Cleanup process 🪣")


# parametrized for backfills
@flow(log_prints=True)
def parent(start_date: date = date(2022, 12, 1), end_date: date = date.today()) -> None:
    df = extract(start_date, end_date)
    transform_load(df)
    cleanup()


if __name__ == "__main__":
    parent()
j

Jon

12/09/2022, 9:12 PM
@Anna Geller, but say that a child flow of a child flow fails, what's the path in the UI that we take from parent flow > child flow > child flow, and how do we know when we hit the end of the stack trace?
a

Anna Geller

12/09/2022, 11:08 PM
if you add return_state=True, the subflow run failure will be non-blocking and subsequent subflows/tasks can still run and you can react to this subflow run state returned - try this to see for yourself how this works:
"""
return_state=True = this way, even if this child flow fails,
the parent flow will continue executing downstream tasks and flows
(e.g., to run some cleanup steps or important final processes
"""
from datetime import date
from prefect import flow
import pandas as pd
import random


@flow(log_prints=True)
def extract(start_date: date, end_date: date) -> pd.DataFrame:
    print(f"Running ingestion from {start_date} to {end_date}")
    return pd.DataFrame(data={"Users": ["Marvin", "LiveEO"]})


@flow(log_prints=True)
def transform_load(df):
    print(f"Got data: {df}")
    print("This step may fail...")
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")


@flow(log_prints=True)
def cleanup():
    print("Cleanup process")


# parametrized for backfills
@flow(log_prints=True)
def parent(start_date: date = date(2022, 12, 1), end_date: date = date.today()):
    df = extract(start_date, end_date)
    transform_load(df, return_state=True)  # cleanup subflow will run
    # transform_load(df) # cleanup will never run
    cleanup()


if __name__ == "__main__":
    parent()
j

Jon

12/29/2022, 2:07 PM
@Anna Geller what's the prefect 1 equivalent to this?
a

Anna Geller

12/29/2022, 4:41 PM
there is none, because v1 doesn't have subflows, only creating runs from deployments (aka ad-hoc runs of registered flows triggered via an API call)