https://prefect.io logo
Title
n

Noah Guilbault

05/24/2021, 6:00 PM
Hi prefect -- i'm attempting to submit a job out to a dask cluster and am encountering an error that seems somewhat obtuse. Prefect flow is registered and submitted with a LocalAgent and DaskExecutor(address="scheduler:8786") LOCAL_AGENT Submitted for execution: PID: 159 CloudFlowRunner Beginning Flow run for 'dask_test' DaskExecutor Connecting to an existing Dask cluster at scheduler:8786 CloudFlowRunner Unexpected error: CancelledError('graph_created_address_file_name-d5a25dd2f2624316b837fb11c5077a2e')
I should also note that the flow works perfectly fine when I use a localexecutor.
k

Kevin Kho

05/24/2021, 6:11 PM
Hi @Noah Guilbault! Could you move the error message to the thread? What version of Prefect are you using and is your flow small enough to share?
That sounds like you are returning something that isn’t serializeable in a task?
n

Noah Guilbault

05/24/2021, 6:22 PM
Traceback (most recent call last): File "/opt/conda/envs/test/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/opt/conda/envs/test/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state task_states[task] = executor.submit( File "/opt/conda/envs/test/lib/python3.8/site-packages/prefect/executors/dask.py", line 395, in submit fut = self.client.submit( File "/opt/conda/envs/test/lib/python3.8/site-packages/distributed/client.py", line 1586, in submit futures = self._graph_to_futures( File "/opt/conda/envs/test/lib/python3.8/site-packages/distributed/client.py", line 2554, in _graph_to_futures dsk = dsk.__dask_distributed_pack__(self, keyset) File "/opt/conda/envs/test/lib/python3.8/site-packages/dask/highlevelgraph.py", line 955, in __dask_distributed_pack__ "state": layer.__dask_distributed_pack__( File "/opt/conda/envs/test/lib/python3.8/site-packages/dask/highlevelgraph.py", line 365, in __dask_distributed_pack__ raise CancelledError(stringify(future.key)) concurrent.futures._base.CancelledError: graph_created_address_file_name-d5a25dd2f2624316b837fb11c5077a2e graph_created_address_file_name is a parameter (string) for the flow.
k

Kevin Kho

05/24/2021, 6:24 PM
Would you have a minimal example?
n

Noah Guilbault

05/24/2021, 6:25 PM
hmm. i'll have to see if i can make on.
one
the item mentioned graph_created_address_file_name-d5a25dd2f2624316b837fb11c5077a2e
is a string parameter for a task.
k

Kevin Kho

05/24/2021, 6:27 PM
That should be fine. Is it a parameter for a mapped task?
n

Noah Guilbault

05/24/2021, 6:27 PM
it is not a mapped task
z

Zanie

05/24/2021, 9:19 PM
Hey @Noah Guilbault -- it looks like Dask Distributed is cancelling your task run. Perhaps take a look at the dask dashboard? Does your flow work with a
DaskExecutor()
that spins up a temporary cluster?
n

Noah Guilbault

05/25/2021, 4:09 PM
i'll check there -- see if there is anything of note. i've not tried with a no arg dask executor
will do that as well
z

Zanie

05/25/2021, 4:10 PM
👍
k

Kevin Kho

05/26/2021, 5:28 AM
Hi @Noah Guilbault, I ran into a similar issue and for me it seemed to be incompatible versions of Dask and Prefect. I took out my Dask pinned versions and let Prefect install Dask and then my Flow started working. I was submitting a job out to Coiled.