Dask question here: I am new to distributed system...
# ask-community
j
Dask question here: I am new to distributed systems, and want to run a large number of tasks (N = 10k+) in parallel. Afaik, Prefect+Dask is scalable to 10k+ parallel tasks (although I know it's ideal to batch many small tasks to few longer-running tasks if possible). However, trying different small toy tasks (and various combinations of
--nprocs
--nthreads
on workers), it seems that consistently - Running Client() locally starts immediately, as expected. - Running with one worker server, starting takes much longer as N increases For large N:
Copy code
[2019-09-30 22:59:15,781] INFO - prefect.TaskRunner | Task 'stage_0': Starting task run...
distributed.utils_perf - INFO - full garbage collection released 561.93 MB from 0 reference cycles (threshold: 10.00 MB)
distributed.core - INFO - Event loop was unresponsive in Worker for 34.31s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
- Running on multiple servers, the Dask scheduler also takes much way longer than the expected ~1ms overhead/task, before any activity happens (on the client servers or the Dask scheduler dashboard). Why is scheduling taking so long? How could I adjust scheduling policy (https://distributed.dask.org/en/latest/scheduling-policies.html) to speed up running large number of small tasks?
c
Hi Jeff - are you using mapping to create the 10k tasks?
j
Yes
Actually, I am using the
create_product
earlier with e.g. len(params1) = len(params2) = 100 --> len(prod) = 10k
👍 1
c
and what is
stage_0
which is hanging?
j
Copy code
[2019-09-30 22:59:15,781] INFO - prefect.TaskRunner | Task 'stage_0': Starting task run...
distributed.utils_perf - INFO - full garbage collection released 561.93 MB from 0 reference cycles (threshold: 10.00 MB)
distributed.core - INFO - Event loop was unresponsive in Worker for 34.31s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
is the current log output
c
right, but what is
stage_0
doing? I’m a bit surprised to see Dask complaining before any tasks are even run - I think I might be missing some context
j
create_product
ran immediately before, and it ran quickly and successfully
stage_0
currently generates a small random Pandas dataframe and saves it to a tmp directory in file titled
f"{param1}_{param2}.csv"
. Space isn't an issue.
c
ok ok I think I’m following now -> it appears that
stage_0
begins its run but freezes, is that correct?
j
It appears so from the log. There is no activity on the Dask dashboard. If I recall correctly, with smaller N, it also freezes (for a much shorter while) before spitting out output.
c
ok yea, this is related to an issue I’ve been digging into in Prefect Cloud as well; I haven’t been able to pinpoint the root cause yet, but it seems that mapping from a worker (which is what Prefect does) causes a bottleneck somewhere in Dask
👍 1
j
Any workarounds for the parallel execution of a large number of tasks?
c
ultimately expanding your workers’ memory allocation is the only thing i’ve found that consistently works; I’ve been running workflows in Prefect Cloud that map over ~12-24k tasks and while they do eventually run I also run into these “zombie” tasks regularly. I’ll try to prioritize fixing this because I agree it is very frustrating. I suspect I’ll have to fundamentally change how we move data around through the Dask Executor
j
Which parameters expand worker memory allocation?
--memory-limit
in Dask?
c
oh my apologies, I’m thinking in kubernetes --> I usually have to increase the size of my nodes
@Marvin archive “Prefect mapping on Dask can sometimes hang indefinitely”
c
^^ I’ll use that issue to track anything I find
d
2 random cents: Pandas is quite the memory hog with its chained assignment check. Dask notices this and gives that kind of warning. A workaround is to disable this check when working with pandas:
Copy code
with pd.option_context('mode.chained_assignment', None):
    .. your pandas code ...
I saw an issue on dask once concerning this but I could not find it immediately
👏 1
j
Could adding an option to chunk the tasks before https://github.com/PrefectHQ/prefect/blob/b684c8dfb7605969e8a49a4f529e4d4cf3790351/src/prefect/engine/executors/dask.py#L166 like in https://distributed.dask.org/en/latest/efficiency.html#use-larger-tasks help? I feel DaskExecutor could support 1M+ concurrent tasks more easily by chunking into smaller number of futures.
c
Yea that’s a great idea; I’ve identified that the primary bottleneck in Prefect mapping is the cost of serialization of the Prefect payload, so the fewer things we have to serialize the better
j
Would something like this in DaskExecutor's
.map()
work?
Copy code
# Chunk into at most 1k futures
        for i in range(len(args)):
            N = len(args[i])//1000
            if N > 1: args[i] = toolz.partition_all(N, args[i])
c
possibly, although this might prevent the 1k futures within each batch from running in parallel
j
It doesn't look like DaskExecutor.map() is ever called (
engine/flow_runner.py
only ever calls
executor.submit(
)
c
It’s actually called within the task runner
j
When executing a flow with 100k concurrent tasks (definitely mapped),
DaskExecutor.map()
is never called
according to the
logging
statements I put into both functions,
DaskExecutor.submit()
is however called once for each task.
c
j
I still don't understand how tasks are submitted for Dask execution. Why a worker, and not the scheduler? I don't understand why my logger from within
DaskExecutor.map
never emits, if the function is called. (I set
self.logger = prefect.utilities.logging.get_logger()
in DaskExecutor's init)
c
they are submitted to the scheduler, just from a worker; the process is: - flow runner submits “parent” tasks to the executor - if a “parent” task is actually mapped, it is responsible for mapping it’s “children” (which is that those lines above are doing) — in Dask specifically, calling “map” sends the work to the scheduler as usual my guess is that your workers are deserializing a plain version of the DaskExecutor and not your modified version
j
Hmmm, I removed
prefect
official package in my Conda virtualenv, so the workers should be using my custom version
I'll look at what task_runner.py is doing as well. Thanks
👍 1
So running with `executor=None`: LocalExecutor's submit function is called for my Parameters, then map is called for my mapped tasks (according to logging statements in each function). However, `executor=DaskExecutor`: DaskExecutor's submit is called for mapped tasks as well (and map is not called). Strange.
For mapped tasks, I want to programmatically batch them into dask.Bag before they're sent to the scheduler.
c
Yea, this makes sense to me; I’m very confident that your dask workers are not picking up your dask executor code changes, but they are still running the
DaskExecutor.map
function
j
Example:
Copy code
import prefect
from prefect import Flow, task

@task
def inc(x):
    return x + 1

with Flow("hi") as f:
    things = list(range(10))
    ans = inc.map(things)

from prefect.engine.executors import DaskExecutor
executor = DaskExecutor(address='xxxxx', debug=True)

fs = f.run(executor=executor)
My logger in DaskExecutor.submit prints twice, but DaskExecutor.map does not. I checked my workers' logs and they have e.g.
Copy code
INFO - prefect.TaskRunner | Task 'inc[4]': Starting task run...
INFO - prefect.TaskRunner | Task 'inc[6]': finished task run for task with final state: 'Success'
but not
Copy code
INFO - prefect.DaskExecutor | Hi, I am in DaskExecutor.map!
as expected. If DaskExecutor.map is ran, where is it logged?
I can't find any evidence that my custom logic in DaskExecutor.map (batching into 2's) was implemented by my workers (excepted to see inc[0] to inc[4], not inc[0] to inc[9]), so I'm a bit confused where it's being run...
c
the custom log that you added to your Dask Executor is not surviving the cloudpickle serialization process on the worker, so when your worker receives the dask executor it’s the original prefect library version
j
But the original library was removed from my virtualenv, so where is the serialized version coming from? And how would I go about testing my version? 😟
c
you just need to make sure that in your dask-worker process the command
import prefect
imports your edited version (you can check via
prefect.__file__
)
could you check that and see what it returns?
you could even do:
Copy code
def prefect_version():
    import prefect
    return prefect.__file__

client.gather(client.submit(prefect_version))
j
Yes, the above function does return
{MY_REPO}/prefect/__init__.py
as expected, while
$ which dask-worker
returns the dask-worker package in my Conda virtualenv.
🧐 1
Ah, restarted my scheduler with a single worker, and now
Copy code
INFO - prefect.DaskExecutor | In map
appears once in worker logs
c
awesome awesome
j
Thanks
c
np, anytime
Hi @Jeff Yun - this PR: https://github.com/PrefectHQ/prefect/pull/1589 should address the behavior you were observing wherein mapped tasks just hung for a long time before any work actually began. Let me now if you get a chance to kick the tires on this branch and if you see any noticeable improvements!
j
There's still a significant hang before any individual tasks are run. Compare to my workaround: In
task_runner.py
line 760-ish:
Copy code
import dask.bag as db
Copy code
MAX_PARTITIONS = 200
        batched_initial_states = db.from_sequence(initial_states, npartitions=MAX_PARTITIONS)
        batched_map_upstream_states = db.from_sequence(map_upstream_states, npartitions=MAX_PARTITIONS)

        map_states = executor.map(
            run_fn, batched_initial_states, range(MAX_PARTITIONS), batched_map_upstream_states
        )
which batches 100k trivial tasks into 200, and runs fairly quickly
I've been testing out your PR, and the following function
Copy code
@task(name='stage_0')
def stage_0_func(cfgdir, tmpdir, prod):
    def sim_key_data(key):
        """ Mocked function returning random Dask Dataframe """
        df = pd.DataFrame(np.random.randint(0,100,size=(10, 4)), columns=list('ABCD'), index=list(range(10)))
        df = dd.from_pandas(df, npartitions=1)
        return df
    
    date, keys = prod
    for key in keys:
        data = sim_key_data(key)
        res = data.to_csv(f'{tmpdir}/{date}_{key}-*.csv')#, compute=False)
        print(f'{tmpdir}/{date}_{key}-*.csv')  # CONFIRM THE FILE IS INDEED BEING WRITTEN
        if type(res) == 'list' and type(res[0]) != 'str':
            print(type(res), res)  # Nothing is ever printed
    return
And get the error:
Copy code
[2019-10-03 21:44:50,776] INFO - prefect.TaskRunner | Task 'stage_0[4686]': Starting task run...
[2019-10-03 21:44:50,783] ERROR - prefect.TaskRunner | Task 'stage_0[4686]': unexpected error while running task: RuntimeError('cannot join thread before it is started',)
Traceback (most recent call last):
  File "REPODIR/demos/prefect/utilities/executors.py", line 49, in inner
    timer.start()
  File "CONDADIR/envs/py36.dev/lib/python3.6/threading.py", line 846, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "REPODIR/demos/prefect/engine/task_runner.py", line 290, in run
    state, inputs=task_inputs, timeout_handler=executor.timeout_handler
  File "REPODIR/demos/prefect/utilities/executors.py", line 53, in inner
    timer.join()
  File "CONDADIR/envs/py36.dev/lib/python3.6/threading.py", line 1051, in join
    raise RuntimeError("cannot join thread before it is started")
RuntimeError: cannot join thread before it is started
[2019-10-03 21:44:50,785] INFO - prefect.TaskRunner | Task 'stage_0[4691]': Starting task run...
[2019-10-03 21:44:50,873] INFO - prefect.TaskRunner | Task 'stage_0[4686]': finished task run for task with final state: 'Failed'
$ ls TMPDIR/0* | wc -l
432
$ ls TMPDIR/1* | wc -l
3897
$ ls TMPDIR/2* | wc -l
3226
$ ls TMPDIR/3* | wc -l
3045
$ ls TMPDIR/4* | wc -l
2925 Stops at different steps with different file numbers each time
c
hm that’s really odd; I’ve reverted that PR since it did not have the intended effect, but I’m honestly surprised to see this — are you running on Windows by any chance? I don’t really see a way in which the timer thread that this traceback refers to is not started
a
Hi, any updates on this? Today I tried mapping 30tasks and the parent task freezes for about 10seconds before the mapped subtasks start running..
c
Hey @alvin goh - no updates; we have a large mapping refactor on our roadmap that might speed this up a little but is the 10s delay an actual issue for your use case or just an annoyance?
a
It can be classified as an annoyance for now :) thanks for the update!