https://prefect.io logo
Title
j

John Ramirez

03/06/2020, 5:42 PM
hey everyone - im dealing with an unusual issue in dask. I need to clear memory after a workflow run (success or failure) but I can’t use
client.restart
in the workflow because the workflow will fail. Any thoughts or ideas on how to clear memory
c

Chris White

03/06/2020, 6:13 PM
Hi @John Ramirez - you can call
client.restart
in the
on_exit
callback of your Flow’s execution environment: https://docs.prefect.io/cloud/execution/overview.html#environments
j

John Ramirez

03/06/2020, 6:19 PM
so i found that but i have my dask cluster within EKS which is not open to the public. when I try to deploy my workflow via docker, the function is validated and breaks
The function also run at deployment which I also do not want
c

Chris White

03/06/2020, 6:21 PM
I don’t think I understand -> if your workflow is successfully running against your Dask cluster, then it must have access to that cluster so this command should work
j

John Ramirez

03/06/2020, 6:24 PM
yes i do have access to the cluster but I use the kubernetes internal IP address in the workflow not the external address
c

Chris White

03/06/2020, 6:26 PM
Sure, but regardless if you can access the cluster then you can call restart whenever the workflow has completed its run via the on exit hook. Did I maybe misunderstand the goal?
j

John Ramirez

03/06/2020, 6:27 PM
no you are right but i dont understand why the function is executed during deployment. Is there a way to turn that off
c

Chris White

03/06/2020, 7:01 PM
I’m sorry I don’t think I understand your question; the way Cloud deployments on K8s work at a high level is: Agent creates K8s job -> Environment class spins up and calls setup, execute, Flow runs and submits work to whatever executor, on exit is called, job is cleaned up
j

John Ramirez

03/06/2020, 7:03 PM
sorry i meant when I run
flow.register()
the
on_start
or
on_exit
function also runs
c

Chris White

03/06/2020, 7:31 PM
Hmm that shouldn’t happen
j

John Ramirez

03/06/2020, 7:35 PM
do want an error log
c

Chris White

03/06/2020, 7:35 PM
Yea, that would be great
j

John Ramirez

03/06/2020, 7:38 PM
Traceback (most recent call last):
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 218, in connect
    _raise(error)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 203, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to '<tcp://dask-scheduler.default.svc.cluster.local:8786>' after 10 s: connect() didn't finish in time

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "main.py", line 92, in <module>
    on_start=clear_memory()
  File "main.py", line 79, in clear_memory
    client = Client('dask-scheduler.default.svc.cluster.local:8786')
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 723, in __init__
    self.start(timeout=timeout)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 896, in start
    sync(self.loop, self._start, **kwargs)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 991, in _start
    await self._ensure_connected(timeout=timeout)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 1048, in _ensure_connected
    connection_args=self.connection_args,
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 227, in connect
    _raise(error)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 203, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to '<tcp://dask-scheduler.default.svc.cluster.local:8786>' after 10 s: Timed out trying to connect to '<tcp://dask-scheduler.default.svc.cluster.local:8786>' after 10 s: connect() didn't finish in time
here is the python code as well
def clear_memory():
    client = Client('dask-scheduler.default.svc.cluster.local:8786')
    client.restart()

ENV = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
        "address": "<tcp://dask-scheduler.default.svc.cluster.local:8786>"
    },
    on_start=clear_memory()
)
c

Chris White

03/06/2020, 7:40 PM
You are running your
clear_memory
function in this code; you should instead be providing that function to `on_start`:
ENV = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
        "address": "<tcp://dask-scheduler.default.svc.cluster.local:8786>"
    },
    on_start=clear_memory # <--- the change I made
)
j

John Ramirez

03/06/2020, 7:57 PM
thank you that worked.
c

Chris White

03/06/2020, 7:57 PM
👍 np