https://prefect.io logo
Title
j

James Brady

08/16/2022, 5:24 AM
Do tasks need to be in the same file as the flow which uses them 🤔 ? When I had a larger task in a separate module, I got PicklingErrors (stack trace in thread) – if I move the task definition into the my main module it works as expected.
Encountered exception during execution:
Traceback (most recent call last):
  File "/code/.venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 46, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function fetch_manifests at 0x7f1eee1180d0>: import of module '__prefect_loader__.fetch_manifests' failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/code/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/code/.venv/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/code/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/code/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/code/s2ag/main.py", line 44, in main
    manifests = fetch_manifests.submit(release)
  File "/code/.venv/lib/python3.10/site-packages/prefect/tasks.py", line 491, in submit
    return enter_task_run_engine(
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/code/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/code/.venv/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/code/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 792, in create_task_run_then_submit
    future = await submit_task_run(
  File "/code/.venv/lib/python3.10/site-packages/prefect/engine.py", line 858, in submit_task_run
    future = await task_runner.submit(
  File "/code/.venv/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 191, in submit
    self._dask_futures[run_key] = self._client.submit(
  File "/code/.venv/lib/python3.10/site-packages/distributed/client.py", line 1802, in submit
    futures = self._graph_to_futures(
  File "/code/.venv/lib/python3.10/site-packages/distributed/client.py", line 2924, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
  File "/code/.venv/lib/python3.10/site-packages/dask/highlevelgraph.py", line 1070, in __dask_distributed_pack__
    "state": layer.__dask_distributed_pack__(
  File "/code/.venv/lib/python3.10/site-packages/dask/highlevelgraph.py", line 432, in __dask_distributed_pack__
    dsk = toolz.valmap(dumps_task, dsk)
  File "/code/.venv/lib/python3.10/site-packages/toolz/dicttoolz.py", line 85, in valmap
    rv.update(zip(d.keys(), map(func, d.values())))
  File "/code/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2825, in dumps_task
    d["kwargs"] = warn_dumps(task[3])
  File "/code/.venv/lib/python3.10/site-packages/distributed/worker.py", line 2837, in warn_dumps
    b = dumps(obj, protocol=4)
  File "/code/.venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 58, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/code/.venv/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/code/.venv/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
_pickle.PicklingError: Can't pickle <function manifest_files at 0x7f1eee165900>: import of module '__prefect_loader__.fetch_manifests' failed
o

Oscar Björhn

08/16/2022, 10:17 AM
I can't really help you other than to say it shouldn't be necessary. Our flows run a bunch of tasks that are defined in other files.
a

Anna Geller

08/16/2022, 10:46 AM
it's a Dask issue
on the prefect end, there are no issues with putting tasks in a separate file, but Dask cloudpickles everything to be able to send work to workers so this may not work with Dask, even though it does work fine with just Prefect + e.g. ConcurrentTaskRunner
👀 1
j

James Brady

08/18/2022, 2:44 PM
Ah, OK thanks Anna! Obviously if we need to put everything in a single file that's not going to work super well 😬 I'll check with the Dask folks.
🙌 1
:thank-you: 1