I’m running a Prefect Flow on a Docker Agent with ...
# prefect-community
j
I’m running a Prefect Flow on a Docker Agent with a DaskExecutor. The flow runs fine for awhile, then I get an error from the CloudFlowRunner
Copy code
Unexpected error: KilledWorker('{{TaskRunName}}', <WorkerState '<tcp://127.0.0.1:44295>', name: 1, status: closed, memory: 0, processing: 72>)
Any idea what might be causing this?
k
This is vague, but the Coiled Killed worker rad on this is good. For me personally, I see it most often with package version mismatches
j
A couple of minutes after getting that flow runner error, I get some more error logs saying there was no heartbeat detected likely because the flow was cancelled
What package versions should I be checking?
k
Dask, distributed, but even like pandas
cloudpickle
i think any package might cause it though.
j
What would the mismatches be between? Do you mean a mismatch between the code being executed in the flow/task runs and what it was originally compiled/picked with?
k
Not necessarily Prefect related, but yes it’s like the task (decorated function) was sent for execution and Dask pickles that to send it to a worker, and the worker unpickles it to execute, but can’t due to version mismatch. Number 2 reason is also out of memory btw, in case that’s a potential reason
Could you trim the traceback btw when you get a chance to keep the main channel cleaner? I think we only need the KilledWorker line
j
Full traceback here for posterity:
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 543, in get_flow_run_state
    {e: state for e, state in upstream_states.items()}
  File "/usr/local/lib/python3.7/site-packages/prefect/executors/dask.py", line 448, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 2152, in gather
    asynchronous=asynchronous,
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 310, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 376, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 349, in f
    result = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 2009, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('{{TaskRunName}}', <WorkerState '<tcp://127.0.0.1:44295>', name: 1, status: closed, memory: 0, processing: 72>)
k
Thank you!
j
My hunch is that it wouldn’t be a pickling error since the code was just packaged and redistributed as a docker image. Would the Dask executor be executing a different version of dask on the agent?
k
How is your DaskExecutor set up? LocalCluster (if i remember the last thread right)?
j
Yeah I just switched to using
Copy code
DaskExecutor(
    cluster_kwargs= {"processes": True, "threads_per_worker": 1, "silence_logs": logging.DEBUG}
)
per your recommendation
Not sure if it was a memory error either CPU peaked at about 75% utilization while the flow was running and memory didn’t go past 25%.
k
But LocalDaskExecutor works fine?
I don’t know enough about how processes work but my guess is that it can still fail if a worker process has 2 GB allocated but goes over for example. Completely don’t know here, but I could see those metrics reflecting the global metrics. Maybe you can specify num_workers and lower the number as a test?
j
Kevin you were spot on. VM is provisioned with 64 gb RAM and 8 CPUs. So each process should theoretically only get 8GB according to defaults. But once this process hit 9 GB, the flow run was shut down. Is there any way to have the workers each get a limit of 16gb by dynamically doing SYSTEM_MEM/NUM_CPU * 2?
k
If you have access to those variables during runtime, maybe you can do this ?
As in use a function to declare the LocalCluster
j
oh that’s really nifty