Oliver Mannion
05/12/2022, 11:33 AMNo heartbeat detected
reported by the Zombie Killer, but we can't see the task state set to failed. Has anyone else experienced this?Anna Geller
05/12/2022, 11:50 AMZombie Killer
service is responsible for handling zombies, which Prefect defines as tasks that claim to be running but haven't updated their heartbeat in the past 2 minutes (Prefect Cloud) or 10 minutes (Prefect Server)."Oliver Mannion
05/12/2022, 1:42 PMAnna Geller
05/12/2022, 1:47 PMOliver Mannion
05/13/2022, 1:02 AM37227baa-a361-4e80-9333-ca1f15fb7104
This is the schematic if it helpsdef process_table(tablename: str,
maxgrab: Parameter, runids: Parameter,
upstream_task: Optional[Task] = None) -> Tuple[Task, Task]:
retrieve_runid_task = retrieve_runid(tablename, maxgrab, runids)
run_model_task = run_model(tablename, retrieve_runid_task)
if upstream_task:
retrieve_runid_task.set_upstream(upstream_task)
return retrieve_runid_task, run_model_task
with Flow("flow") as flow:
maxgrab_param = Parameter("maxgrab", default=maxgrab)
runids_param = Parameter("runids", default=runids)
runids_views = create_runids_views()
runids_organisation, run_model_organisation = process_table("organisation", maxgrab_param, runids_param)
runids_organisation.set_upstream(runids_views)
sharded_tablename = Constant(["invoice", "lineitem", "account", "statementline"],
task_run_name="sharded_tablename", checkpoint=True, result=PrefectResult())
apply_map(
process_table,
tablename=sharded_tablename,
maxgrab=unmapped(maxgrab_param),
runids=unmapped(runids_param),
upstream_task=unmapped(run_model_organisation))
Anna Geller
05/13/2022, 1:57 AMpods ['prefect-job-505449d8-rgl9r'] failed for this job
- then one person from your team restarted the flow run and it then succeeded.
I don't see a Flow object in your flow code definition - how do you define it? Usually, it's defined using:
with Flow("yourflow") as flow:
your_task = your_decorated_task_fn()
...
but in the end, everything seems successful 🤔 is the issue solved now? were you able to detect which task caused the heartbeat issues?Oliver Mannion
05/13/2022, 2:45 AM748e39b5-495e-4629-bd03-c7420cdfb581
(run_model) is the task that had heartbeat issues (and rightly so because the Kubernetes pod died!)Anna Geller
05/13/2022, 11:33 AMwhy the task didn't go into the Failed state, and then retry automatically without us intervening... Any ideas?do you know which task it was? hard to say without knowing what this task does, it may happen with some unclosed DB connections, long-running jobs, and generally, especially when running processes external to Prefect
Oliver Mannion
05/21/2022, 12:56 AMrun_model
task which runs dbt:
@task(log_stdout=True, task_run_name="run_model_{tablename}", max_retries=3, retry_delay=timedelta(seconds=10))
def run_model(tablename: str, runid: str) -> None:
logger = prefect.context.get("logger")
save_keyfile_secret()
if runid:
dbt_config = DbtShellTask(
profiles_dir='./',
return_all=True,
log_stderr=True,
stream_output=True
)
command = f"""dbt run --models tag:{tablename} --vars '{{"runids":"{runid}"}}'"""
<http://logger.info|logger.info>(command)
dbt_config.run(command=command)
else:
<http://logger.info|logger.info>(f"No runid for {tablename} dbt won't be launched")
But unfortunately we have now lost all the logs and task state because we deleted the project 😞 I'll keep an eye out for it happening again...Anna Geller
05/21/2022, 11:45 AM