https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Constantino Schillebeeckx

05/25/2022, 1:58 PM
I have a flow with a section of mapped tasks, one of them failed due to a missing heartbeat - the final flow state is
success
though. why wouldn't this flow's overall state be set to
failed
?
in addition to this, I recently set
PREFECT__CLOUD__HEARTBEAT_MODE
to "thread" in order to (hopefully) prevent these heartbeat issues but they remain - this flow has been running stably for a long time (without heartbeat) issues. how can we be sure heartbeats don't just kill this flow?
a

Anna Geller

05/25/2022, 2:00 PM
it depends on which task is set as a reference task
if your task is say create_flow_run, and this is the last task in your flow, then your flow run will be considered successful already when the child flow run was triggered, not when the child flow run was successful
how can we be sure heartbeats don't just kill this flow?
could you elaborate?
c

Constantino Schillebeeckx

05/25/2022, 2:02 PM
this is what the flow looks like - one of the load tasks failed
well, the missing heartbeat is a false positive - the task actually ran successfully. i thought that setting
PREFECT__CLOUD__HEARTBEAT_MODE
to "thread" was meant to help avoid issues like this.
a

Anna Geller

05/25/2022, 2:03 PM
heartbeat for external tasks is a challenging problem and we are aware that the way it is currently implemented is not a perfect UX so we'll try to address it in a better way in Prefect 2.0
it's meant to help mitigate the problem, but switching to threads alone doesn't fully solve the issue
👍 1
c

Constantino Schillebeeckx

05/25/2022, 2:05 PM
my flow code looks like:
Copy code
with DHFlow(FLOW_NAME) as flow:

    anchor_date, do_extract, do_load, do_transform = get_elt_context(
        do_extract=Parameter(name="extract", default=True if IS_PRD else False),
        do_load=Parameter(name="load", default=True),
        do_transform=Parameter(name="transform", default=True),
        anchor_date=Parameter(name="anchor_date", default=None),
    )

    layouts = get_layouts(f"{VENDOR}/layouts", anchor_date=anchor_date)

    e = extract.map(
        layout=layouts,
        anchor=unmapped(anchor_date),
        execute=unmapped(do_extract),
    )
    glue_db_name = manage_glue_catalog(
        base_db_name=f"base-{VENDOR}",
        description=get_l1_glue_description(vendor=VENDOR),
        schema_name=f"base_{VENDOR}",
        execute=unmapped(do_load),
    )
    ld = load.map(
        layout=layouts,
        anchor=unmapped(anchor_date),
        glue_catalog=unmapped(glue_db_name),
        upstream_tasks=[e],
        execute=unmapped(do_load),
    )
    dbt_run = dbt(
        command=f"run --models models/l2_groom/{VENDOR}", upstream_tasks=[ld], execute=unmapped(do_transform)
    )
    dbt_run.name = "transform"
i'm still confused about how these mapped tasks play together, if one of the
load
tasks fails, why would my
dbt
task run if
load
is an upstream task? is it because i'm not doing
upstream_tasks=[unmapped(ld)]
?
totally forgot i also asked about this a month ago - looks like the core issue wasn't really resolved for me
this discussion might also be relevant
a

Anna Geller

05/30/2022, 9:50 AM
Hi @Constantino Schillebeeckx, checking after holidays: LMK if this is still an open issue
I think this should be:
Copy code
upstream_tasks=unmapped([ld])
2 Views