Isaac Brodsky
01/06/2022, 7:36 PMRuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loopwith worker_client()Kevin Kho
with worker_client()Isaac Brodsky
01/06/2022, 7:40 PMdef custom_task_function(
    fn: Optional[Callable] = None, **task_kwargs
) -> Union[FunctionTask, Callable[[Callable], FunctionTask]]:
    """
    Wrapped around prefect.task that sets up the Dask client.
    """
    def create_wrapper(fn):
        @wraps(fn)
        def wrapped_fn(*args, **kwargs):
            # do something else in local
            with worker_client():
                return fn(*args, **kwargs)
        return wrapped_fn
    # from @prefect.task
    if fn is None:
        return lambda fn: FunctionTask(
            fn=create_wrapper(fn),
            **task_kwargs,
        )
    return FunctionTask(fn=create_wrapper(fn), **task_kwargs)Isaac Brodsky
01/06/2022, 7:40 PMcustom_task_functionIsaac Brodsky
01/06/2022, 7:41 PMKevin Kho
Isaac Brodsky
01/06/2022, 7:54 PMKevin Kho
Kevin Kho
Isaac Brodsky
01/06/2022, 8:14 PMKevin Kho
Isaac Brodsky
01/06/2022, 8:15 PMKevin Kho
Isaac Brodsky
01/06/2022, 8:16 PMIsaac Brodsky
01/06/2022, 8:17 PMIsaac Brodsky
01/06/2022, 8:17 PMKevin Kho
