o

    Oliver Mannion

    4 months ago
    Hiya when using Prefect Cloud we've experienced more than once now a
    No heartbeat detected
    reported by the Zombie Killer, but we can't see the task state set to failed. Has anyone else experienced this?
    Anna Geller

    Anna Geller

    4 months ago
    Zombie Killer tries to kill any task run which doesn't emit heartbeats (which may happen with long-running jobs, and/or processes external to Prefect). This topic discusses the issue in much more detail
    10min is for Server, 2 min is for Cloud: "The
    Zombie Killer
    service is responsible for handling zombies, which Prefect defines as tasks that claim to be running but haven't updated their heartbeat in the past 2 minutes (Prefect Cloud) or 10 minutes (Prefect Server)."
    o

    Oliver Mannion

    4 months ago
    Thanks Anna I see the “No heartbeat” log line but my task did not move into the failed state. Is this expected?
    Anna Geller

    Anna Geller

    4 months ago
    it might be, depending on the use case, I don't have enough information to tell. Can you send your flow run ID and share the code for your flow? (looking more for Flow() structure not your tasks)
    o

    Oliver Mannion

    4 months ago
    Thanks Anna the flow run id is
    37227baa-a361-4e80-9333-ca1f15fb7104
    This is the schematic if it helps
    def process_table(tablename: str,
                      maxgrab: Parameter, runids: Parameter,
                      upstream_task: Optional[Task] = None) -> Tuple[Task, Task]:
        retrieve_runid_task = retrieve_runid(tablename, maxgrab, runids)
        run_model_task = run_model(tablename, retrieve_runid_task)
    
        if upstream_task:
            retrieve_runid_task.set_upstream(upstream_task)
        return retrieve_runid_task, run_model_task
    
    
        with Flow("flow") as flow:
            maxgrab_param = Parameter("maxgrab", default=maxgrab)
            runids_param = Parameter("runids", default=runids)
    
            runids_views = create_runids_views()
    
            runids_organisation, run_model_organisation = process_table("organisation", maxgrab_param, runids_param)
            runids_organisation.set_upstream(runids_views)
    
            sharded_tablename = Constant(["invoice", "lineitem", "account", "statementline"],
                                         task_run_name="sharded_tablename", checkpoint=True, result=PrefectResult())
    
            apply_map(
                process_table,
                tablename=sharded_tablename,
                maxgrab=unmapped(maxgrab_param),
                runids=unmapped(runids_param),
                upstream_task=unmapped(run_model_organisation))
    Anna Geller

    Anna Geller

    4 months ago
    Thanks for sharing the ID - the logs tell us that this run failed due to Kubernetes Error:
    pods ['prefect-job-505449d8-rgl9r'] failed for this job
    - then one person from your team restarted the flow run and it then succeeded. I don't see a Flow object in your flow code definition - how do you define it? Usually, it's defined using:
    with Flow("yourflow") as flow:
        your_task = your_decorated_task_fn()
        ...
    but in the end, everything seems successful 🤔 is the issue solved now? were you able to detect which task caused the heartbeat issues?
    o

    Oliver Mannion

    4 months ago
    Thanks Anna.. yes we manually restarted it. But we're trying to understand why the task didn't go into the Failed state, and then retry automatically without us intervening... Any ideas? The flow is above at bottom of the snippet (sorry the indentation isn't quite right).
    We think task run
    748e39b5-495e-4629-bd03-c7420cdfb581
    (run_model) is the task that had heartbeat issues (and rightly so because the Kubernetes pod died!)
    Anna Geller

    Anna Geller

    4 months ago
    why the task didn't go into the Failed state, and then retry automatically without us intervening... Any ideas?
    do you know which task it was? hard to say without knowing what this task does, it may happen with some unclosed DB connections, long-running jobs, and generally, especially when running processes external to Prefect
    o

    Oliver Mannion

    4 months ago
    It's the
    run_model
    task which runs dbt:
    @task(log_stdout=True, task_run_name="run_model_{tablename}", max_retries=3, retry_delay=timedelta(seconds=10))
    def run_model(tablename: str, runid: str) -> None:
        logger = prefect.context.get("logger")
    
        save_keyfile_secret()
    
        if runid:
            dbt_config = DbtShellTask(
                profiles_dir='./',
                return_all=True,
                log_stderr=True,
                stream_output=True
            )
            command = f"""dbt run --models tag:{tablename} --vars '{{"runids":"{runid}"}}'"""
            <http://logger.info|logger.info>(command)
            dbt_config.run(command=command)
        else:
            <http://logger.info|logger.info>(f"No runid for {tablename} dbt won't be launched")
    But unfortunately we have now lost all the logs and task state because we deleted the project 😞 I'll keep an eye out for it happening again...
    Anna Geller

    Anna Geller

    4 months ago
    It could be related to the fact that you run this shell task from another task - you may try moving it out into the Flow scope as shown e.g. here keep us posted!