Hi all - I'm trying out Orion w/ Kubernetes + Dask...
# prefect-community
l
Hi all - I'm trying out Orion w/ Kubernetes + Dask, and Google Cloud Storage for storage. I'm running into a pickling error (see thread) - any idea what I'm doing wrong?
Flow code:
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner

@task
def say_hello():
    logger = get_run_logger()
    <http://logger.info|logger.info>("hello from your favorite task")

@flow(task_runner=DaskTaskRunner(
    adapt_kwargs={ "maximum": 10 }
))
def synthesize():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello from Kubernetes!")

    say_hello()
Deployment spec:
Copy code
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner

DeploymentSpec(
    name="TestPipeline",
    flow_location='flowrun.py', # the file copied above
    flow_runner=KubernetesFlowRunner(
        image='<http://gcr.io/[internal-bucket]/pipeline-worker-v2:latest|gcr.io/[internal-bucket]/pipeline-worker-v2:latest>',
        image_pull_policy='Always',
        service_account_name='[service-account-name]',
    ),
)
It seems like Dask or Prefect is trying to pickle the Cloud Storage client object, which can't be pickled. I'm not sure how to change that, since I'm pretty sure Prefect is doing it on my behalf. Thoughts?
The flow run completes successfully if I switch to
ConcurrentTaskRunner
instead of
DaskTaskRunner
k
This does seem like something is wrong. I’d have to ask another engineer. In the mean time though, could you move the traceback to the thread to keep the main channel a bit neater?
🙌 1
l
Happily - thanks Kevin!
Full stack trace:
Copy code
Info	2022/05/19 05:33:28 PM	Using task runner 'DaskTaskRunner'
Info	2022/05/19 05:33:33 PM	Hello from Kubernetes!
Info	2022/05/19 05:33:33 PM	Created task run 'say_hello-88cd6863-0' for task 'say_hello'
Error	2022/05/19 05:33:33 PM	Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 40, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function say_hello at 0x7f7a8fd6ee50>: import of module '<run_path>' failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/tmp/flow-script-synthesize5109duz5.py", line 88, in synthesize
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 356, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 550, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 621, in create_and_submit_task_run
    future = await flow_run_context.task_runner.submit(
  File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 369, in submit
    self._dask_futures[task_run.id] = self._client.submit(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1770, in submit
    futures = self._graph_to_futures(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2892, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
  File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 1062, in __dask_distributed_pack__
    "state": layer.__dask_distributed_pack__(
  File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 432, in __dask_distributed_pack__
    dsk = toolz.valmap(dumps_task, dsk)
  File "/usr/local/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(d.keys(), map(func, d.values())))
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4588, in dumps_task
    d["kwargs"] = warn_dumps(task[3])
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4600, in warn_dumps
    b = dumps(obj, protocol=4)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 51, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/client/__init__.py", line 194, in __getstate__
    raise PicklingError(
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and <http://unpickleable.Info|unpickleable.Info>	2022/05/19 05:33:28 PM	Using task runner 'DaskTaskRunner'
Info	2022/05/19 05:33:33 PM	Hello from Kubernetes!
Info	2022/05/19 05:33:33 PM	Created task run 'say_hello-88cd6863-0' for task 'say_hello'
Error	2022/05/19 05:33:33 PM	Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 40, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function say_hello at 0x7f7a8fd6ee50>: import of module '<run_path>' failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/tmp/flow-script-synthesize5109duz5.py", line 88, in synthesize
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 356, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 550, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 621, in create_and_submit_task_run
    future = await flow_run_context.task_runner.submit(
  File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 369, in submit
    self._dask_futures[task_run.id] = self._client.submit(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1770, in submit
    futures = self._graph_to_futures(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2892, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
  File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 1062, in __dask_distributed_pack__
    "state": layer.__dask_distributed_pack__(
  File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 432, in __dask_distributed_pack__
    dsk = toolz.valmap(dumps_task, dsk)
  File "/usr/local/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(d.keys(), map(func, d.values())))
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4588, in dumps_task
    d["kwargs"] = warn_dumps(task[3])
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4600, in warn_dumps
    b = dumps(obj, protocol=4)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 51, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/client/__init__.py", line 194, in __getstate__
    raise PicklingError(
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
k
I was able to reproduce
Issue has been opened here
l
fantastic - thank you! subscribed
k
looks like it was already merged to master (but no nightly build yet)