David Martin Calalang
08/20/2025, 4:06 PMFile "/usr/local/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function find_connected_components at 0x7f83d10f3f60>: it's not the same object as dask_distributed.find_connected_component
Marvin
08/20/2025, 4:06 PMMarvin
08/20/2025, 4:09 PMfind_connected_components
in a real module, not inside a flow/task/notebook cell/script body.
- Import and use it where needed.
# src/myproj/graph.py
def find_connected_components(G):
...
# flow.py
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from myproj.graph import find_connected_components
@task
def compute_components(G):
return find_connected_components(G)
@flow(task_runner=DaskTaskRunner())
def main(G):
return compute_components.submit(G)
2) Avoid collisions and duplicate loads
- If you have a file/folder named dask_distributed.py
or a package named dask_distributed
, rename it. Then delete any .pyc/__pycache__.
- Use absolute imports consistently. Don’t run the same code once as a module and once as a script.
- In scripts, guard execution with if __name__ == "__main__":
so imports on workers don’t execute top-level code.
3) Keep client and workers on the same code
- If you’re on a remote Dask cluster, package your code or deploy it so workers import the exact same module version.
- In Prefect 3.x deployments, prefer sourcing your code so workers pull it:
from prefect import flow
f = flow.from_source("<https://github.com/owner/repo>", entrypoint="flows/flow.py:main")
f.deploy(name="my-deploy", work_pool_name="...", # etc.
)
- Or use a container image with your code preinstalled. Just don’t let workers guess.
4) Restart the world
- Restart your Python session and the Dask cluster/workers after code changes to avoid stale modules in memory.
Quick diagnostics (optional but useful):
import inspect, sys
print(find_connected_components.__module__, find_connected_components.__qualname__)
print(sys.modules.get(find_connected_components.__module__))
[k for k in sys.modules if "dask_distributed" in k]
If you share:
- Where find_connected_components
is defined
- How you’re running the flow (local, deployment, K8s)
- Prefect/Dask versions
…I can point to the exact culprit instead of waving at all the likely ones like a weather forecast.
References:
- Prefect + Dask integration (Prefect 3.x)
- Dask serialization docs
- Debugging serialization errors