I'm getting some odd error about a Dask future my ...
# ask-community
i
I'm getting some odd error about a Dask future my job awaits being attached to a different loop:
RuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loop
As far as I can tell I did not change anything about how work was submitted to Dask so I am wondering if this is some intermittent issue about where the Prefect task is running? My Prefect task is wrapped in
with worker_client()
k
This is the first I’ve seen this error so I’m not familiar. You’re not putting a callback on these tasks right? Could you show the
with worker_client()
syntax?
i
Copy code
def custom_task_function(
    fn: Optional[Callable] = None, **task_kwargs
) -> Union[FunctionTask, Callable[[Callable], FunctionTask]]:
    """
    Wrapped around prefect.task that sets up the Dask client.
    """

    def create_wrapper(fn):
        @wraps(fn)
        def wrapped_fn(*args, **kwargs):
            # do something else in local
            with worker_client():
                return fn(*args, **kwargs)

        return wrapped_fn

    # from @prefect.task
    if fn is None:
        return lambda fn: FunctionTask(
            fn=create_wrapper(fn),
            **task_kwargs,
        )
    return FunctionTask(fn=create_wrapper(fn), **task_kwargs)
And then my task function is decorated with
custom_task_function
And my runtime environment is KubernetesRun / DaskExecutor with KubeCluster
k
What is the goal of this setup instead of using the executor interface? You’re going to add stuff in the wrapper eventually?
i
What is the executor interface you're referring to?
k
Like I mean just using the DaskExecutor to send the mapped tasks to the cluster?
Did you try turning off checkpointing for the wrapped task? Are you dealing with a Dask object like Dask DataFrame or Bag?
i
I'm not blocked on running mapped tasks on the cluster - that is already being done. Within one of the tasks I want to do a Dask computation (I have a dd.DataFrame and I need to do some large processing on it)
k
Gotcha. I assume you’ve seen this right? I think this issue might be related to checkpointing on the task
i
I did have some inconsistent settings around checkpointing before but I'm trying to do some debugging now to confirm whether those settings were causing issues
k
Sorry I meant this link
i
I wonder if it would be simpler to not run the Prefect tasks on Dask but rather have a task spin up a Dask cluster for the duration of that heavy task
I did see that
I had to revert some changes I made and I will try to confirm if the checkpoint setting is an issue
k
Yeah the resource manager might work for you