https://prefect.io logo
Title
j

Jeff Yun

09/30/2019, 11:15 PM
Dask question here: I am new to distributed systems, and want to run a large number of tasks (N = 10k+) in parallel. Afaik, Prefect+Dask is scalable to 10k+ parallel tasks (although I know it's ideal to batch many small tasks to few longer-running tasks if possible). However, trying different small toy tasks (and various combinations of
--nprocs
--nthreads
on workers), it seems that consistently - Running Client() locally starts immediately, as expected. - Running with one worker server, starting takes much longer as N increases For large N:
[2019-09-30 22:59:15,781] INFO - prefect.TaskRunner | Task 'stage_0': Starting task run...
distributed.utils_perf - INFO - full garbage collection released 561.93 MB from 0 reference cycles (threshold: 10.00 MB)
distributed.core - INFO - Event loop was unresponsive in Worker for 34.31s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
- Running on multiple servers, the Dask scheduler also takes much way longer than the expected ~1ms overhead/task, before any activity happens (on the client servers or the Dask scheduler dashboard). Why is scheduling taking so long? How could I adjust scheduling policy (https://distributed.dask.org/en/latest/scheduling-policies.html) to speed up running large number of small tasks?
c

Chris White

09/30/2019, 11:19 PM
Hi Jeff - are you using mapping to create the 10k tasks?
j

Jeff Yun

09/30/2019, 11:19 PM
Yes
Actually, I am using the
create_product
earlier with e.g. len(params1) = len(params2) = 100 --> len(prod) = 10k
👍 1
c

Chris White

09/30/2019, 11:21 PM
and what is
stage_0
which is hanging?
j

Jeff Yun

09/30/2019, 11:22 PM
[2019-09-30 22:59:15,781] INFO - prefect.TaskRunner | Task 'stage_0': Starting task run...
distributed.utils_perf - INFO - full garbage collection released 561.93 MB from 0 reference cycles (threshold: 10.00 MB)
distributed.core - INFO - Event loop was unresponsive in Worker for 34.31s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
is the current log output
c

Chris White

09/30/2019, 11:22 PM
right, but what is
stage_0
doing? I’m a bit surprised to see Dask complaining before any tasks are even run - I think I might be missing some context
j

Jeff Yun

09/30/2019, 11:22 PM
create_product
ran immediately before, and it ran quickly and successfully
stage_0
currently generates a small random Pandas dataframe and saves it to a tmp directory in file titled
f"{param1}_{param2}.csv"
. Space isn't an issue.
c

Chris White

09/30/2019, 11:25 PM
ok ok I think I’m following now -> it appears that
stage_0
begins its run but freezes, is that correct?
j

Jeff Yun

09/30/2019, 11:26 PM
It appears so from the log. There is no activity on the Dask dashboard. If I recall correctly, with smaller N, it also freezes (for a much shorter while) before spitting out output.
c

Chris White

09/30/2019, 11:30 PM
ok yea, this is related to an issue I’ve been digging into in Prefect Cloud as well; I haven’t been able to pinpoint the root cause yet, but it seems that mapping from a worker (which is what Prefect does) causes a bottleneck somewhere in Dask
👍 1
j

Jeff Yun

09/30/2019, 11:32 PM
Any workarounds for the parallel execution of a large number of tasks?
c

Chris White

09/30/2019, 11:35 PM
ultimately expanding your workers’ memory allocation is the only thing i’ve found that consistently works; I’ve been running workflows in Prefect Cloud that map over ~12-24k tasks and while they do eventually run I also run into these “zombie” tasks regularly. I’ll try to prioritize fixing this because I agree it is very frustrating. I suspect I’ll have to fundamentally change how we move data around through the Dask Executor
j

Jeff Yun

09/30/2019, 11:36 PM
Which parameters expand worker memory allocation?
--memory-limit
in Dask?
c

Chris White

09/30/2019, 11:37 PM
oh my apologies, I’m thinking in kubernetes --> I usually have to increase the size of my nodes
@Marvin archive “Prefect mapping on Dask can sometimes hang indefinitely”
c

Chris White

09/30/2019, 11:47 PM
^^ I’ll use that issue to track anything I find
d

David Ojeda

10/01/2019, 1:20 PM
2 random cents: Pandas is quite the memory hog with its chained assignment check. Dask notices this and gives that kind of warning. A workaround is to disable this check when working with pandas:
with pd.option_context('mode.chained_assignment', None):
    .. your pandas code ...
I saw an issue on dask once concerning this but I could not find it immediately
👏 1
j

Jeff Yun

10/01/2019, 4:09 PM
Could adding an option to chunk the tasks before https://github.com/PrefectHQ/prefect/blob/b684c8dfb7605969e8a49a4f529e4d4cf3790351/src/prefect/engine/executors/dask.py#L166 like in https://distributed.dask.org/en/latest/efficiency.html#use-larger-tasks help? I feel DaskExecutor could support 1M+ concurrent tasks more easily by chunking into smaller number of futures.
c

Chris White

10/01/2019, 4:11 PM
Yea that’s a great idea; I’ve identified that the primary bottleneck in Prefect mapping is the cost of serialization of the Prefect payload, so the fewer things we have to serialize the better
j

Jeff Yun

10/01/2019, 4:24 PM
Would something like this in DaskExecutor's
.map()
work?
# Chunk into at most 1k futures
        for i in range(len(args)):
            N = len(args[i])//1000
            if N > 1: args[i] = toolz.partition_all(N, args[i])
c

Chris White

10/01/2019, 5:09 PM
possibly, although this might prevent the 1k futures within each batch from running in parallel
j

Jeff Yun

10/01/2019, 6:26 PM
It doesn't look like DaskExecutor.map() is ever called (
engine/flow_runner.py
only ever calls
executor.submit(
)
c

Chris White

10/01/2019, 6:27 PM
It’s actually called within the task runner
j

Jeff Yun

10/01/2019, 6:43 PM
When executing a flow with 100k concurrent tasks (definitely mapped),
DaskExecutor.map()
is never called
according to the
logging
statements I put into both functions,
DaskExecutor.submit()
is however called once for each task.
c

Chris White

10/01/2019, 6:45 PM
j

Jeff Yun

10/01/2019, 6:46 PM
I still don't understand how tasks are submitted for Dask execution. Why a worker, and not the scheduler? I don't understand why my logger from within
DaskExecutor.map
never emits, if the function is called. (I set
self.logger = prefect.utilities.logging.get_logger()
in DaskExecutor's init)
c

Chris White

10/01/2019, 6:49 PM
they are submitted to the scheduler, just from a worker; the process is: - flow runner submits “parent” tasks to the executor - if a “parent” task is actually mapped, it is responsible for mapping it’s “children” (which is that those lines above are doing) — in Dask specifically, calling “map” sends the work to the scheduler as usual my guess is that your workers are deserializing a plain version of the DaskExecutor and not your modified version
j

Jeff Yun

10/01/2019, 6:50 PM
Hmmm, I removed
prefect
official package in my Conda virtualenv, so the workers should be using my custom version
I'll look at what task_runner.py is doing as well. Thanks
👍 1
So running with `executor=None`: LocalExecutor's submit function is called for my Parameters, then map is called for my mapped tasks (according to logging statements in each function). However, `executor=DaskExecutor`: DaskExecutor's submit is called for mapped tasks as well (and map is not called). Strange.
For mapped tasks, I want to programmatically batch them into dask.Bag before they're sent to the scheduler.
c

Chris White

10/02/2019, 8:47 PM
Yea, this makes sense to me; I’m very confident that your dask workers are not picking up your dask executor code changes, but they are still running the
DaskExecutor.map
function
j

Jeff Yun

10/02/2019, 8:57 PM
Example:
import prefect
from prefect import Flow, task

@task
def inc(x):
    return x + 1

with Flow("hi") as f:
    things = list(range(10))
    ans = inc.map(things)

from prefect.engine.executors import DaskExecutor
executor = DaskExecutor(address='xxxxx', debug=True)

fs = f.run(executor=executor)
My logger in DaskExecutor.submit prints twice, but DaskExecutor.map does not. I checked my workers' logs and they have e.g.
INFO - prefect.TaskRunner | Task 'inc[4]': Starting task run...
INFO - prefect.TaskRunner | Task 'inc[6]': finished task run for task with final state: 'Success'
but not
INFO - prefect.DaskExecutor | Hi, I am in DaskExecutor.map!
as expected. If DaskExecutor.map is ran, where is it logged?
I can't find any evidence that my custom logic in DaskExecutor.map (batching into 2's) was implemented by my workers (excepted to see inc[0] to inc[4], not inc[0] to inc[9]), so I'm a bit confused where it's being run...
c

Chris White

10/02/2019, 8:59 PM
the custom log that you added to your Dask Executor is not surviving the cloudpickle serialization process on the worker, so when your worker receives the dask executor it’s the original prefect library version
j

Jeff Yun

10/02/2019, 9:01 PM
But the original library was removed from my virtualenv, so where is the serialized version coming from? And how would I go about testing my version? 😟
c

Chris White

10/02/2019, 9:06 PM
you just need to make sure that in your dask-worker process the command
import prefect
imports your edited version (you can check via
prefect.__file__
)
could you check that and see what it returns?
you could even do:
def prefect_version():
    import prefect
    return prefect.__file__

client.gather(client.submit(prefect_version))
j

Jeff Yun

10/02/2019, 9:13 PM
Yes, the above function does return
{MY_REPO}/prefect/__init__.py
as expected, while
$ which dask-worker
returns the dask-worker package in my Conda virtualenv.
🧐 1
Ah, restarted my scheduler with a single worker, and now
INFO - prefect.DaskExecutor | In map
appears once in worker logs
c

Chris White

10/02/2019, 9:25 PM
awesome awesome
j

Jeff Yun

10/02/2019, 9:25 PM
Thanks
c

Chris White

10/02/2019, 9:25 PM
np, anytime
Hi @Jeff Yun - this PR: https://github.com/PrefectHQ/prefect/pull/1589 should address the behavior you were observing wherein mapped tasks just hung for a long time before any work actually began. Let me now if you get a chance to kick the tires on this branch and if you see any noticeable improvements!
j

Jeff Yun

10/02/2019, 11:47 PM
There's still a significant hang before any individual tasks are run. Compare to my workaround: In
task_runner.py
line 760-ish:
import dask.bag as db
MAX_PARTITIONS = 200
        batched_initial_states = db.from_sequence(initial_states, npartitions=MAX_PARTITIONS)
        batched_map_upstream_states = db.from_sequence(map_upstream_states, npartitions=MAX_PARTITIONS)

        map_states = executor.map(
            run_fn, batched_initial_states, range(MAX_PARTITIONS), batched_map_upstream_states
        )
which batches 100k trivial tasks into 200, and runs fairly quickly
I've been testing out your PR, and the following function
@task(name='stage_0')
def stage_0_func(cfgdir, tmpdir, prod):
    def sim_key_data(key):
        """ Mocked function returning random Dask Dataframe """
        df = pd.DataFrame(np.random.randint(0,100,size=(10, 4)), columns=list('ABCD'), index=list(range(10)))
        df = dd.from_pandas(df, npartitions=1)
        return df
    
    date, keys = prod
    for key in keys:
        data = sim_key_data(key)
        res = data.to_csv(f'{tmpdir}/{date}_{key}-*.csv')#, compute=False)
        print(f'{tmpdir}/{date}_{key}-*.csv')  # CONFIRM THE FILE IS INDEED BEING WRITTEN
        if type(res) == 'list' and type(res[0]) != 'str':
            print(type(res), res)  # Nothing is ever printed
    return
And get the error:
[2019-10-03 21:44:50,776] INFO - prefect.TaskRunner | Task 'stage_0[4686]': Starting task run...
[2019-10-03 21:44:50,783] ERROR - prefect.TaskRunner | Task 'stage_0[4686]': unexpected error while running task: RuntimeError('cannot join thread before it is started',)
Traceback (most recent call last):
  File "REPODIR/demos/prefect/utilities/executors.py", line 49, in inner
    timer.start()
  File "CONDADIR/envs/py36.dev/lib/python3.6/threading.py", line 846, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "REPODIR/demos/prefect/engine/task_runner.py", line 290, in run
    state, inputs=task_inputs, timeout_handler=executor.timeout_handler
  File "REPODIR/demos/prefect/utilities/executors.py", line 53, in inner
    timer.join()
  File "CONDADIR/envs/py36.dev/lib/python3.6/threading.py", line 1051, in join
    raise RuntimeError("cannot join thread before it is started")
RuntimeError: cannot join thread before it is started
[2019-10-03 21:44:50,785] INFO - prefect.TaskRunner | Task 'stage_0[4691]': Starting task run...
[2019-10-03 21:44:50,873] INFO - prefect.TaskRunner | Task 'stage_0[4686]': finished task run for task with final state: 'Failed'
$ ls TMPDIR/0* | wc -l
432
$ ls TMPDIR/1* | wc -l
3897
$ ls TMPDIR/2* | wc -l
3226
$ ls TMPDIR/3* | wc -l
3045
$ ls TMPDIR/4* | wc -l
2925 Stops at different steps with different file numbers each time
c

Chris White

10/03/2019, 10:05 PM
hm that’s really odd; I’ve reverted that PR since it did not have the intended effect, but I’m honestly surprised to see this — are you running on Windows by any chance? I don’t really see a way in which the timer thread that this traceback refers to is not started
a

alvin goh

12/12/2019, 3:24 PM
Hi, any updates on this? Today I tried mapping 30tasks and the parent task freezes for about 10seconds before the mapped subtasks start running..
c

Chris White

12/12/2019, 7:02 PM
Hey @alvin goh - no updates; we have a large mapping refactor on our roadmap that might speed this up a little but is the 10s delay an actual issue for your use case or just an annoyance?
a

alvin goh

12/13/2019, 9:51 AM
It can be classified as an annoyance for now :) thanks for the update!