Constantino Schillebeeckx
05/25/2022, 1:58 PMsuccess
though. why wouldn't this flow's overall state be set to failed
?PREFECT__CLOUD__HEARTBEAT_MODE
to "thread" in order to (hopefully) prevent these heartbeat issues but they remain - this flow has been running stably for a long time (without heartbeat) issues. how can we be sure heartbeats don't just kill this flow?Anna Geller
05/25/2022, 2:00 PMhow can we be sure heartbeats don't just kill this flow?could you elaborate?
Constantino Schillebeeckx
05/25/2022, 2:02 PMPREFECT__CLOUD__HEARTBEAT_MODE
to "thread" was meant to help avoid issues like this.Anna Geller
05/25/2022, 2:03 PMConstantino Schillebeeckx
05/25/2022, 2:05 PMwith DHFlow(FLOW_NAME) as flow:
anchor_date, do_extract, do_load, do_transform = get_elt_context(
do_extract=Parameter(name="extract", default=True if IS_PRD else False),
do_load=Parameter(name="load", default=True),
do_transform=Parameter(name="transform", default=True),
anchor_date=Parameter(name="anchor_date", default=None),
)
layouts = get_layouts(f"{VENDOR}/layouts", anchor_date=anchor_date)
e = extract.map(
layout=layouts,
anchor=unmapped(anchor_date),
execute=unmapped(do_extract),
)
glue_db_name = manage_glue_catalog(
base_db_name=f"base-{VENDOR}",
description=get_l1_glue_description(vendor=VENDOR),
schema_name=f"base_{VENDOR}",
execute=unmapped(do_load),
)
ld = load.map(
layout=layouts,
anchor=unmapped(anchor_date),
glue_catalog=unmapped(glue_db_name),
upstream_tasks=[e],
execute=unmapped(do_load),
)
dbt_run = dbt(
command=f"run --models models/l2_groom/{VENDOR}", upstream_tasks=[ld], execute=unmapped(do_transform)
)
dbt_run.name = "transform"
load
tasks fails, why would my dbt
task run if load
is an upstream task? is it because i'm not doing upstream_tasks=[unmapped(ld)]
?Anna Geller
05/30/2022, 9:50 AMupstream_tasks=unmapped([ld])