Luke Segars
05/20/2022, 12:38 AMfrom prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
@task
def say_hello():
logger = get_run_logger()
<http://logger.info|logger.info>("hello from your favorite task")
@flow(task_runner=DaskTaskRunner(
adapt_kwargs={ "maximum": 10 }
))
def synthesize():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello from Kubernetes!")
say_hello()
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
DeploymentSpec(
name="TestPipeline",
flow_location='flowrun.py', # the file copied above
flow_runner=KubernetesFlowRunner(
image='<http://gcr.io/[internal-bucket]/pipeline-worker-v2:latest|gcr.io/[internal-bucket]/pipeline-worker-v2:latest>',
image_pull_policy='Always',
service_account_name='[service-account-name]',
),
)
ConcurrentTaskRunner
instead of DaskTaskRunner
Kevin Kho
05/20/2022, 1:22 AMLuke Segars
05/20/2022, 1:33 AMInfo 2022/05/19 05:33:28 PM Using task runner 'DaskTaskRunner'
Info 2022/05/19 05:33:33 PM Hello from Kubernetes!
Info 2022/05/19 05:33:33 PM Created task run 'say_hello-88cd6863-0' for task 'say_hello'
Error 2022/05/19 05:33:33 PM Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 40, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function say_hello at 0x7f7a8fd6ee50>: import of module '<run_path>' failed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/tmp/flow-script-synthesize5109duz5.py", line 88, in synthesize
File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 356, in __call__
return enter_task_run_engine(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 550, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 621, in create_and_submit_task_run
future = await flow_run_context.task_runner.submit(
File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 369, in submit
self._dask_futures[task_run.id] = self._client.submit(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1770, in submit
futures = self._graph_to_futures(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2892, in _graph_to_futures
dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 1062, in __dask_distributed_pack__
"state": layer.__dask_distributed_pack__(
File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 432, in __dask_distributed_pack__
dsk = toolz.valmap(dumps_task, dsk)
File "/usr/local/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
rv.update(zip(d.keys(), map(func, d.values())))
File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4588, in dumps_task
d["kwargs"] = warn_dumps(task[3])
File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4600, in warn_dumps
b = dumps(obj, protocol=4)
File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 51, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/local/lib/python3.8/site-packages/google/cloud/client/__init__.py", line 194, in __getstate__
raise PicklingError(
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and <http://unpickleable.Info|unpickleable.Info> 2022/05/19 05:33:28 PM Using task runner 'DaskTaskRunner'
Info 2022/05/19 05:33:33 PM Hello from Kubernetes!
Info 2022/05/19 05:33:33 PM Created task run 'say_hello-88cd6863-0' for task 'say_hello'
Error 2022/05/19 05:33:33 PM Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 40, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function say_hello at 0x7f7a8fd6ee50>: import of module '<run_path>' failed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/tmp/flow-script-synthesize5109duz5.py", line 88, in synthesize
File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 356, in __call__
return enter_task_run_engine(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 550, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 621, in create_and_submit_task_run
future = await flow_run_context.task_runner.submit(
File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 369, in submit
self._dask_futures[task_run.id] = self._client.submit(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1770, in submit
futures = self._graph_to_futures(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2892, in _graph_to_futures
dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 1062, in __dask_distributed_pack__
"state": layer.__dask_distributed_pack__(
File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 432, in __dask_distributed_pack__
dsk = toolz.valmap(dumps_task, dsk)
File "/usr/local/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
rv.update(zip(d.keys(), map(func, d.values())))
File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4588, in dumps_task
d["kwargs"] = warn_dumps(task[3])
File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 4600, in warn_dumps
b = dumps(obj, protocol=4)
File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 51, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/local/lib/python3.8/site-packages/google/cloud/client/__init__.py", line 194, in __getstate__
raise PicklingError(
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
Kevin Kho
05/20/2022, 3:12 PMLuke Segars
05/20/2022, 6:14 PMKevin Kho
05/20/2022, 6:16 PM