Isaac Brodsky
01/06/2022, 7:36 PMRuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loop
As far as I can tell I did not change anything about how work was submitted to Dask so I am wondering if this is some intermittent issue about where the Prefect task is running? My Prefect task is wrapped in with worker_client()
Kevin Kho
with worker_client()
syntax?Isaac Brodsky
01/06/2022, 7:40 PMdef custom_task_function(
fn: Optional[Callable] = None, **task_kwargs
) -> Union[FunctionTask, Callable[[Callable], FunctionTask]]:
"""
Wrapped around prefect.task that sets up the Dask client.
"""
def create_wrapper(fn):
@wraps(fn)
def wrapped_fn(*args, **kwargs):
# do something else in local
with worker_client():
return fn(*args, **kwargs)
return wrapped_fn
# from @prefect.task
if fn is None:
return lambda fn: FunctionTask(
fn=create_wrapper(fn),
**task_kwargs,
)
return FunctionTask(fn=create_wrapper(fn), **task_kwargs)
Isaac Brodsky
01/06/2022, 7:40 PMcustom_task_function
Isaac Brodsky
01/06/2022, 7:41 PMKevin Kho
Isaac Brodsky
01/06/2022, 7:54 PMKevin Kho
Kevin Kho
Isaac Brodsky
01/06/2022, 8:14 PMKevin Kho
Isaac Brodsky
01/06/2022, 8:15 PMKevin Kho
Isaac Brodsky
01/06/2022, 8:16 PMIsaac Brodsky
01/06/2022, 8:17 PMIsaac Brodsky
01/06/2022, 8:17 PMKevin Kho