Constantino Schillebeeckx
05/25/2022, 1:58 PMsuccess
though. why wouldn't this flow's overall state be set to failed
?Constantino Schillebeeckx
05/25/2022, 2:00 PMPREFECT__CLOUD__HEARTBEAT_MODE
to "thread" in order to (hopefully) prevent these heartbeat issues but they remain - this flow has been running stably for a long time (without heartbeat) issues. how can we be sure heartbeats don't just kill this flow?Anna Geller
Anna Geller
Anna Geller
how can we be sure heartbeats don't just kill this flow?could you elaborate?
Constantino Schillebeeckx
05/25/2022, 2:02 PMConstantino Schillebeeckx
05/25/2022, 2:02 PMPREFECT__CLOUD__HEARTBEAT_MODE
to "thread" was meant to help avoid issues like this.Anna Geller
Anna Geller
Constantino Schillebeeckx
05/25/2022, 2:05 PMwith DHFlow(FLOW_NAME) as flow:
anchor_date, do_extract, do_load, do_transform = get_elt_context(
do_extract=Parameter(name="extract", default=True if IS_PRD else False),
do_load=Parameter(name="load", default=True),
do_transform=Parameter(name="transform", default=True),
anchor_date=Parameter(name="anchor_date", default=None),
)
layouts = get_layouts(f"{VENDOR}/layouts", anchor_date=anchor_date)
e = extract.map(
layout=layouts,
anchor=unmapped(anchor_date),
execute=unmapped(do_extract),
)
glue_db_name = manage_glue_catalog(
base_db_name=f"base-{VENDOR}",
description=get_l1_glue_description(vendor=VENDOR),
schema_name=f"base_{VENDOR}",
execute=unmapped(do_load),
)
ld = load.map(
layout=layouts,
anchor=unmapped(anchor_date),
glue_catalog=unmapped(glue_db_name),
upstream_tasks=[e],
execute=unmapped(do_load),
)
dbt_run = dbt(
command=f"run --models models/l2_groom/{VENDOR}", upstream_tasks=[ld], execute=unmapped(do_transform)
)
dbt_run.name = "transform"
Constantino Schillebeeckx
05/25/2022, 2:06 PMload
tasks fails, why would my dbt
task run if load
is an upstream task? is it because i'm not doing upstream_tasks=[unmapped(ld)]
?Constantino Schillebeeckx
05/25/2022, 2:46 PMConstantino Schillebeeckx
05/25/2022, 3:00 PMAnna Geller
Anna Geller
upstream_tasks=unmapped([ld])