Constantino Schillebeeckx
05/25/2022, 1:58 PMsuccess
though. why wouldn't this flow's overall state be set to failed
?PREFECT__CLOUD__HEARTBEAT_MODE
to "thread" in order to (hopefully) prevent these heartbeat issues but they remain - this flow has been running stably for a long time (without heartbeat) issues. how can we be sure heartbeats don't just kill this flow?Anna Geller
how can we be sure heartbeats don't just kill this flow?could you elaborate?
Constantino Schillebeeckx
05/25/2022, 2:02 PMPREFECT__CLOUD__HEARTBEAT_MODE
to "thread" was meant to help avoid issues like this.Anna Geller
Constantino Schillebeeckx
05/25/2022, 2:05 PMwith DHFlow(FLOW_NAME) as flow:
anchor_date, do_extract, do_load, do_transform = get_elt_context(
do_extract=Parameter(name="extract", default=True if IS_PRD else False),
do_load=Parameter(name="load", default=True),
do_transform=Parameter(name="transform", default=True),
anchor_date=Parameter(name="anchor_date", default=None),
)
layouts = get_layouts(f"{VENDOR}/layouts", anchor_date=anchor_date)
e = extract.map(
layout=layouts,
anchor=unmapped(anchor_date),
execute=unmapped(do_extract),
)
glue_db_name = manage_glue_catalog(
base_db_name=f"base-{VENDOR}",
description=get_l1_glue_description(vendor=VENDOR),
schema_name=f"base_{VENDOR}",
execute=unmapped(do_load),
)
ld = load.map(
layout=layouts,
anchor=unmapped(anchor_date),
glue_catalog=unmapped(glue_db_name),
upstream_tasks=[e],
execute=unmapped(do_load),
)
dbt_run = dbt(
command=f"run --models models/l2_groom/{VENDOR}", upstream_tasks=[ld], execute=unmapped(do_transform)
)
dbt_run.name = "transform"
load
tasks fails, why would my dbt
task run if load
is an upstream task? is it because i'm not doing upstream_tasks=[unmapped(ld)]
?Anna Geller
upstream_tasks=unmapped([ld])