Paco Ibañez
01/20/2023, 6:52 PM<http://dask.distributed.as|dask.distributed.as>_completed
https://github.com/dask/distributed/blob/main/distributed/client.py#L4955Zanie
01/20/2023, 7:03 PMas_completed
utility yet, but I figure there are a few ways we can help with this.Paco Ibañez
01/20/2023, 7:08 PMCrash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 163, in start
yield self
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 370, in begin_flow_run
terminal_state = await orchestrate_flow_run(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 624, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 132, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
raise CancelledError
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/contextlib.py", line 662, in __aexit__
cb_suppress = await cb(*exc_details)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1460, in __aexit__
await self._close(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1666, in _close
await self.scheduler_comm.close()
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/contextlib.py", line 662, in __aexit__
cb_suppress = await cb(*exc_details)
File "/usr/local/lib/python3.8/contextlib.py", line 189, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 166, in start
self._started = False
File "/usr/local/lib/python3.8/contextlib.py", line 679, in __aexit__
raise exc_details[1]
File "/usr/local/lib/python3.8/contextlib.py", line 662, in __aexit__
cb_suppress = await cb(*exc_details)
File "/usr/local/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 538, in __aexit__
await f
File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 441, in _close
await self._correct_state()
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Zanie
01/20/2023, 7:09 PMPaco Ibañez
01/20/2023, 7:10 PMZanie
01/20/2023, 7:22 PMPaco Ibañez
01/20/2023, 10:07 PMimport time
from prefect.orion.schemas.states import TERMINAL_STATES
context = get_run_context()
dask_client = context.task_runner._client
task_batch_size = 2*len(dask_client.ncores())
futures = {}
ongoing_futures = set()
for src in expanded_sources:
future = ingest_csv.submit(graph_db_config, src, storage_config)
futures[future.name] = future
ongoing_futures.add(future.name)
while len(ongoing_futures) >= task_batch_size:
time.sleep(0.5)
for future_name in list(ongoing_futures):
if futures[future_name].get_state().type in TERMINAL_STATES:
ongoing_futures.remove(future_name)
# wait for tasks to finish
finished = [ future.wait() for future in futures.values() ]