Jon
12/09/2022, 7:37 PMAnna Geller
12/09/2022, 8:54 PM"""
prefect deployment build -n dev -q dev -a flows/11_parent_child/parent_child.py:extract
prefect deployment build -n dev -q dev -a flows/11_parent_child/parent_child.py:transform_load
prefect deployment build -n dev -q dev -a flows/11_parent_child/parent_child.py:cleanup
"""
from datetime import date
from prefect import flow
@flow(log_prints=True)
def extract(start_date: date = date(2022, 12, 1), end_date: date = date.today()) -> int:
print(f"Running ingestion from {start_date} to {end_date} ✅")
return 42
@flow(log_prints=True)
def transform_load(x: int = 42) -> None:
print(f"Got data: {x} 📊")
@flow(log_prints=True)
def cleanup() -> None:
print("Cleanup process 🪣")
# parametrized for backfills
@flow(log_prints=True)
def parent(start_date: date = date(2022, 12, 1), end_date: date = date.today()) -> None:
df = extract(start_date, end_date)
transform_load(df)
cleanup()
if __name__ == "__main__":
parent()
Jon
12/09/2022, 9:12 PMAnna Geller
12/09/2022, 11:08 PM"""
return_state=True = this way, even if this child flow fails,
the parent flow will continue executing downstream tasks and flows
(e.g., to run some cleanup steps or important final processes
"""
from datetime import date
from prefect import flow
import pandas as pd
import random
@flow(log_prints=True)
def extract(start_date: date, end_date: date) -> pd.DataFrame:
print(f"Running ingestion from {start_date} to {end_date}")
return pd.DataFrame(data={"Users": ["Marvin", "LiveEO"]})
@flow(log_prints=True)
def transform_load(df):
print(f"Got data: {df}")
print("This step may fail...")
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
@flow(log_prints=True)
def cleanup():
print("Cleanup process")
# parametrized for backfills
@flow(log_prints=True)
def parent(start_date: date = date(2022, 12, 1), end_date: date = date.today()):
df = extract(start_date, end_date)
transform_load(df, return_state=True) # cleanup subflow will run
# transform_load(df) # cleanup will never run
cleanup()
if __name__ == "__main__":
parent()
Jon
12/29/2022, 2:07 PMAnna Geller
12/29/2022, 4:41 PM