https://prefect.io logo
#prefect-community
Title
# prefect-community
o

Oliver Mannion

05/12/2022, 11:33 AM
Hiya when using Prefect Cloud we've experienced more than once now a
No heartbeat detected
reported by the Zombie Killer, but we can't see the task state set to failed. Has anyone else experienced this?
a

Anna Geller

05/12/2022, 11:50 AM
Zombie Killer tries to kill any task run which doesn't emit heartbeats (which may happen with long-running jobs, and/or processes external to Prefect). This topic discusses the issue in much more detail
10min is for Server, 2 min is for Cloud: "The
Zombie Killer
service is responsible for handling zombies, which Prefect defines as tasks that claim to be running but haven't updated their heartbeat in the past 2 minutes (Prefect Cloud) or 10 minutes (Prefect Server)."
o

Oliver Mannion

05/12/2022, 1:42 PM
Thanks Anna I see the “No heartbeat” log line but my task did not move into the failed state. Is this expected?
a

Anna Geller

05/12/2022, 1:47 PM
it might be, depending on the use case, I don't have enough information to tell. Can you send your flow run ID and share the code for your flow? (looking more for Flow() structure not your tasks)
o

Oliver Mannion

05/13/2022, 1:02 AM
Thanks Anna the flow run id is
37227baa-a361-4e80-9333-ca1f15fb7104
This is the schematic if it helps
Copy code
def process_table(tablename: str,
                  maxgrab: Parameter, runids: Parameter,
                  upstream_task: Optional[Task] = None) -> Tuple[Task, Task]:
    retrieve_runid_task = retrieve_runid(tablename, maxgrab, runids)
    run_model_task = run_model(tablename, retrieve_runid_task)

    if upstream_task:
        retrieve_runid_task.set_upstream(upstream_task)
    return retrieve_runid_task, run_model_task


    with Flow("flow") as flow:
        maxgrab_param = Parameter("maxgrab", default=maxgrab)
        runids_param = Parameter("runids", default=runids)

        runids_views = create_runids_views()

        runids_organisation, run_model_organisation = process_table("organisation", maxgrab_param, runids_param)
        runids_organisation.set_upstream(runids_views)

        sharded_tablename = Constant(["invoice", "lineitem", "account", "statementline"],
                                     task_run_name="sharded_tablename", checkpoint=True, result=PrefectResult())

        apply_map(
            process_table,
            tablename=sharded_tablename,
            maxgrab=unmapped(maxgrab_param),
            runids=unmapped(runids_param),
            upstream_task=unmapped(run_model_organisation))
a

Anna Geller

05/13/2022, 1:57 AM
Thanks for sharing the ID - the logs tell us that this run failed due to Kubernetes Error:
pods ['prefect-job-505449d8-rgl9r'] failed for this job
- then one person from your team restarted the flow run and it then succeeded. I don't see a Flow object in your flow code definition - how do you define it? Usually, it's defined using:
Copy code
with Flow("yourflow") as flow:
    your_task = your_decorated_task_fn()
    ...
but in the end, everything seems successful 🤔 is the issue solved now? were you able to detect which task caused the heartbeat issues?
o

Oliver Mannion

05/13/2022, 2:45 AM
Thanks Anna.. yes we manually restarted it. But we're trying to understand why the task didn't go into the Failed state, and then retry automatically without us intervening... Any ideas? The flow is above at bottom of the snippet (sorry the indentation isn't quite right).
We think task run
748e39b5-495e-4629-bd03-c7420cdfb581
(run_model) is the task that had heartbeat issues (and rightly so because the Kubernetes pod died!)
👍 1
a

Anna Geller

05/13/2022, 11:33 AM
why the task didn't go into the Failed state, and then retry automatically without us intervening... Any ideas?
do you know which task it was? hard to say without knowing what this task does, it may happen with some unclosed DB connections, long-running jobs, and generally, especially when running processes external to Prefect
o

Oliver Mannion

05/21/2022, 12:56 AM
It's the
run_model
task which runs dbt:
Copy code
@task(log_stdout=True, task_run_name="run_model_{tablename}", max_retries=3, retry_delay=timedelta(seconds=10))
def run_model(tablename: str, runid: str) -> None:
    logger = prefect.context.get("logger")

    save_keyfile_secret()

    if runid:
        dbt_config = DbtShellTask(
            profiles_dir='./',
            return_all=True,
            log_stderr=True,
            stream_output=True
        )
        command = f"""dbt run --models tag:{tablename} --vars '{{"runids":"{runid}"}}'"""
        <http://logger.info|logger.info>(command)
        dbt_config.run(command=command)
    else:
        <http://logger.info|logger.info>(f"No runid for {tablename} dbt won't be launched")
But unfortunately we have now lost all the logs and task state because we deleted the project 😞 I'll keep an eye out for it happening again...
a

Anna Geller

05/21/2022, 11:45 AM
It could be related to the fact that you run this shell task from another task - you may try moving it out into the Flow scope as shown e.g. here keep us posted!
8 Views