<@ULVA73B9P> I'm using the Dask Task Runner and I'...
# ask-marvin
d
@Marvin I'm using the Dask Task Runner and I'm getting this
Copy code
File "/usr/local/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function find_connected_components at 0x7f83d10f3f60>: it's not the same object as dask_distributed.find_connected_component
m
thought for 214.5 seconds
That pickling error usually means Dask is trying to serialize a function by reference (module + name), but the object it finds at that path isn’t the same as the one you handed it. In practice, it boils down to one of these: - Your function isn’t importable (defined in main, a notebook cell, or nested inside another function) - Two copies of the same module are loaded under different names/paths (relative vs absolute import, module reloading, running code both as a package and as a script) - Name collision or shadowing (e.g., you have a local module/package named `dask_distributed`… which your error suspiciously mentions) - Workers have different code than the client (stale/edited code, different installs, different containers) Concrete fixes to try: 1) Make your function importable and top-level - Define
find_connected_components
in a real module, not inside a flow/task/notebook cell/script body. - Import and use it where needed.
Copy code
# src/myproj/graph.py
def find_connected_components(G):
    ...

# flow.py
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from myproj.graph import find_connected_components

@task
def compute_components(G):
    return find_connected_components(G)

@flow(task_runner=DaskTaskRunner())
def main(G):
    return compute_components.submit(G)
2) Avoid collisions and duplicate loads - If you have a file/folder named
dask_distributed.py
or a package named
dask_distributed
, rename it. Then delete any .pyc/__pycache__. - Use absolute imports consistently. Don’t run the same code once as a module and once as a script. - In scripts, guard execution with
if __name__ == "__main__":
so imports on workers don’t execute top-level code. 3) Keep client and workers on the same code - If you’re on a remote Dask cluster, package your code or deploy it so workers import the exact same module version. - In Prefect 3.x deployments, prefer sourcing your code so workers pull it:
Copy code
from prefect import flow
f = flow.from_source("<https://github.com/owner/repo>", entrypoint="flows/flow.py:main")
f.deploy(name="my-deploy", work_pool_name="...", # etc.
)
- Or use a container image with your code preinstalled. Just don’t let workers guess. 4) Restart the world - Restart your Python session and the Dask cluster/workers after code changes to avoid stale modules in memory. Quick diagnostics (optional but useful):
Copy code
import inspect, sys
print(find_connected_components.__module__, find_connected_components.__qualname__)
print(sys.modules.get(find_connected_components.__module__))
[k for k in sys.modules if "dask_distributed" in k]
If you share: - Where
find_connected_components
is defined - How you’re running the flow (local, deployment, K8s) - Prefect/Dask versions …I can point to the exact culprit instead of waving at all the likely ones like a weather forecast. References: - Prefect + Dask integration (Prefect 3.x) - Dask serialization docs - Debugging serialization errors