Thread
#prefect-community
    i

    Isaac Brodsky

    8 months ago
    I'm getting some odd error about a Dask future my job awaits being attached to a different loop:
    RuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loop
    As far as I can tell I did not change anything about how work was submitted to Dask so I am wondering if this is some intermittent issue about where the Prefect task is running? My Prefect task is wrapped in
    with worker_client()
    Kevin Kho

    Kevin Kho

    8 months ago
    This is the first I’ve seen this error so I’m not familiar. You’re not putting a callback on these tasks right? Could you show the
    with worker_client()
    syntax?
    i

    Isaac Brodsky

    8 months ago
    def custom_task_function(
        fn: Optional[Callable] = None, **task_kwargs
    ) -> Union[FunctionTask, Callable[[Callable], FunctionTask]]:
        """
        Wrapped around prefect.task that sets up the Dask client.
        """
    
        def create_wrapper(fn):
            @wraps(fn)
            def wrapped_fn(*args, **kwargs):
                # do something else in local
                with worker_client():
                    return fn(*args, **kwargs)
    
            return wrapped_fn
    
        # from @prefect.task
        if fn is None:
            return lambda fn: FunctionTask(
                fn=create_wrapper(fn),
                **task_kwargs,
            )
        return FunctionTask(fn=create_wrapper(fn), **task_kwargs)
    And then my task function is decorated with
    custom_task_function
    And my runtime environment is KubernetesRun / DaskExecutor with KubeCluster
    Kevin Kho

    Kevin Kho

    8 months ago
    What is the goal of this setup instead of using the executor interface? You’re going to add stuff in the wrapper eventually?
    i

    Isaac Brodsky

    8 months ago
    What is the executor interface you're referring to?
    Kevin Kho

    Kevin Kho

    8 months ago
    Like I mean just using the DaskExecutor to send the mapped tasks to the cluster?
    Did you try turning off checkpointing for the wrapped task? Are you dealing with a Dask object like Dask DataFrame or Bag?
    i

    Isaac Brodsky

    8 months ago
    I'm not blocked on running mapped tasks on the cluster - that is already being done. Within one of the tasks I want to do a Dask computation (I have a dd.DataFrame and I need to do some large processing on it)
    Kevin Kho

    Kevin Kho

    8 months ago
    Gotcha. I assume you’ve seen this right? I think this issue might be related to checkpointing on the task
    i

    Isaac Brodsky

    8 months ago
    I did have some inconsistent settings around checkpointing before but I'm trying to do some debugging now to confirm whether those settings were causing issues
    Kevin Kho

    Kevin Kho

    8 months ago
    Sorry I meant this link
    i

    Isaac Brodsky

    8 months ago
    I wonder if it would be simpler to not run the Prefect tasks on Dask but rather have a task spin up a Dask cluster for the duration of that heavy task
    I did see that
    I had to revert some changes I made and I will try to confirm if the checkpoint setting is an issue
    Kevin Kho

    Kevin Kho

    8 months ago
    Yeah the resource manager might work for you