Hello! I am using a dask task runner and I have no...
# prefect-community
p
Hello! I am using a dask task runner and I have noticed that if I create a bunch of tasks at once, some of them end up crashing and the flow fails. I was wondering if there is a way to initially create as many tasks as dask workers and only submit new tasks as previous tasks finish. Does prefect provide a method similar to
<http://dask.distributed.as|dask.distributed.as>_completed
https://github.com/dask/distributed/blob/main/distributed/client.py#L4955
z
Would you mind opening an issue with a reproduction / traceback for that failure?
We don’t have an
as_completed
utility yet, but I figure there are a few ways we can help with this.
p
Other people have reported a similar traceback
Copy code
Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 163, in start
    yield self
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 370, in begin_flow_run
    terminal_state = await orchestrate_flow_run(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 624, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 132, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
    raise CancelledError
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1460, in __aexit__
    await self._close(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1666, in _close
    await self.scheduler_comm.close()
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.8/contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/usr/local/lib/python3.8/site-packages/prefect/task_runners.py", line 166, in start
    self._started = False
  File "/usr/local/lib/python3.8/contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 538, in __aexit__
    await f
  File "/usr/local/lib/python3.8/site-packages/distributed/deploy/spec.py", line 441, in _close
    await self._correct_state()
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:
z
Hm none of those look like the root cause of the issue. They’re just symptoms of cancellation.
p
I assumed it has to do with dask, I have worked with it in the past and I had to control how many jobs I submit at once
the flow works fine when I submit 10 to 20 tasks and starts failing as I submit more
z
It seems like Dask would also be interested in supporting high levels of submission, maybe we can contribute a fix upstream.
We can also add some sort of rate limiting in the Dask task runner itself, but an issue with a MRE is the first step.
p
I believe this code got me what I need, not sure if it is the best way to achieve my goal:
Copy code
import time
    from prefect.orion.schemas.states import TERMINAL_STATES
    context = get_run_context()
    dask_client = context.task_runner._client
    task_batch_size = 2*len(dask_client.ncores())
    futures = {}
    ongoing_futures = set()
    for src in expanded_sources:
        future = ingest_csv.submit(graph_db_config, src, storage_config)
        futures[future.name] = future
        ongoing_futures.add(future.name)
        while len(ongoing_futures) >= task_batch_size:
            time.sleep(0.5)
            for future_name in list(ongoing_futures):
                if futures[future_name].get_state().type in TERMINAL_STATES:
                    ongoing_futures.remove(future_name)
    # wait for tasks to finish
    finished = [ future.wait() for future in futures.values() ]
excellent 1
Screen Shot 2023-01-20 at 4.06.34 PM.png