Jeff Yun
09/30/2019, 11:15 PM--nprocs
--nthreads
on workers), it seems that consistently
- Running Client() locally starts immediately, as expected.
- Running with one worker server, starting takes much longer as N increases
For large N:
[2019-09-30 22:59:15,781] INFO - prefect.TaskRunner | Task 'stage_0': Starting task run...
distributed.utils_perf - INFO - full garbage collection released 561.93 MB from 0 reference cycles (threshold: 10.00 MB)
distributed.core - INFO - Event loop was unresponsive in Worker for 34.31s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
- Running on multiple servers, the Dask scheduler also takes much way longer than the expected ~1ms overhead/task, before any activity happens (on the client servers or the Dask scheduler dashboard).
Why is scheduling taking so long? How could I adjust scheduling policy (https://distributed.dask.org/en/latest/scheduling-policies.html) to speed up running large number of small tasks?Chris White
09/30/2019, 11:19 PMJeff Yun
09/30/2019, 11:19 PMcreate_product
earlier with e.g. len(params1) = len(params2) = 100 --> len(prod) = 10kChris White
09/30/2019, 11:21 PMstage_0
which is hanging?Jeff Yun
09/30/2019, 11:22 PM[2019-09-30 22:59:15,781] INFO - prefect.TaskRunner | Task 'stage_0': Starting task run...
distributed.utils_perf - INFO - full garbage collection released 561.93 MB from 0 reference cycles (threshold: 10.00 MB)
distributed.core - INFO - Event loop was unresponsive in Worker for 34.31s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
Chris White
09/30/2019, 11:22 PMstage_0
doing? I’m a bit surprised to see Dask complaining before any tasks are even run - I think I might be missing some contextJeff Yun
09/30/2019, 11:22 PMcreate_product
ran immediately before, and it ran quickly and successfullystage_0
currently generates a small random Pandas dataframe and saves it to a tmp directory in file titled f"{param1}_{param2}.csv"
. Space isn't an issue.Chris White
09/30/2019, 11:25 PMstage_0
begins its run but freezes, is that correct?Jeff Yun
09/30/2019, 11:26 PMChris White
09/30/2019, 11:30 PMJeff Yun
09/30/2019, 11:32 PMChris White
09/30/2019, 11:35 PMJeff Yun
09/30/2019, 11:36 PM--memory-limit
in Dask?Chris White
09/30/2019, 11:37 PMMarvin
09/30/2019, 11:46 PMChris White
09/30/2019, 11:47 PMDavid Ojeda
10/01/2019, 1:20 PMwith pd.option_context('mode.chained_assignment', None):
.. your pandas code ...
Jeff Yun
10/01/2019, 4:09 PMChris White
10/01/2019, 4:11 PMJeff Yun
10/01/2019, 4:24 PM.map()
work?
# Chunk into at most 1k futures
for i in range(len(args)):
N = len(args[i])//1000
if N > 1: args[i] = toolz.partition_all(N, args[i])
Chris White
10/01/2019, 5:09 PMJeff Yun
10/01/2019, 6:26 PMengine/flow_runner.py
only ever calls executor.submit(
)Chris White
10/01/2019, 6:27 PMJeff Yun
10/01/2019, 6:43 PMDaskExecutor.map()
is never calledlogging
statements I put into both functions, DaskExecutor.submit()
is however called once for each task.Chris White
10/01/2019, 6:45 PMDaskExecutor.map
is called on a worker: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L756-L758Jeff Yun
10/01/2019, 6:46 PMDaskExecutor.map
never emits, if the function is called.
(I set self.logger = prefect.utilities.logging.get_logger()
in DaskExecutor's init)Chris White
10/01/2019, 6:49 PMJeff Yun
10/01/2019, 6:50 PMprefect
official package in my Conda virtualenv, so the workers should be using my custom versionChris White
10/02/2019, 8:47 PMDaskExecutor.map
functionJeff Yun
10/02/2019, 8:57 PMimport prefect
from prefect import Flow, task
@task
def inc(x):
return x + 1
with Flow("hi") as f:
things = list(range(10))
ans = inc.map(things)
from prefect.engine.executors import DaskExecutor
executor = DaskExecutor(address='xxxxx', debug=True)
fs = f.run(executor=executor)
My logger in DaskExecutor.submit prints twice, but DaskExecutor.map does not.
I checked my workers' logs and they have e.g.
INFO - prefect.TaskRunner | Task 'inc[4]': Starting task run...
INFO - prefect.TaskRunner | Task 'inc[6]': finished task run for task with final state: 'Success'
but not
INFO - prefect.DaskExecutor | Hi, I am in DaskExecutor.map!
as expected.
If DaskExecutor.map is ran, where is it logged?Chris White
10/02/2019, 8:59 PMJeff Yun
10/02/2019, 9:01 PMChris White
10/02/2019, 9:06 PMimport prefect
imports your edited version (you can check via prefect.__file__
)def prefect_version():
import prefect
return prefect.__file__
client.gather(client.submit(prefect_version))
Jeff Yun
10/02/2019, 9:13 PM{MY_REPO}/prefect/__init__.py
as expected, while $ which dask-worker
returns the dask-worker package in my Conda virtualenv.INFO - prefect.DaskExecutor | In map
appears once in worker logsChris White
10/02/2019, 9:25 PMJeff Yun
10/02/2019, 9:25 PMChris White
10/02/2019, 9:25 PMJeff Yun
10/02/2019, 11:47 PMtask_runner.py
line 760-ish:
import dask.bag as db
MAX_PARTITIONS = 200
batched_initial_states = db.from_sequence(initial_states, npartitions=MAX_PARTITIONS)
batched_map_upstream_states = db.from_sequence(map_upstream_states, npartitions=MAX_PARTITIONS)
map_states = executor.map(
run_fn, batched_initial_states, range(MAX_PARTITIONS), batched_map_upstream_states
)
@task(name='stage_0')
def stage_0_func(cfgdir, tmpdir, prod):
def sim_key_data(key):
""" Mocked function returning random Dask Dataframe """
df = pd.DataFrame(np.random.randint(0,100,size=(10, 4)), columns=list('ABCD'), index=list(range(10)))
df = dd.from_pandas(df, npartitions=1)
return df
date, keys = prod
for key in keys:
data = sim_key_data(key)
res = data.to_csv(f'{tmpdir}/{date}_{key}-*.csv')#, compute=False)
print(f'{tmpdir}/{date}_{key}-*.csv') # CONFIRM THE FILE IS INDEED BEING WRITTEN
if type(res) == 'list' and type(res[0]) != 'str':
print(type(res), res) # Nothing is ever printed
return
[2019-10-03 21:44:50,776] INFO - prefect.TaskRunner | Task 'stage_0[4686]': Starting task run...
[2019-10-03 21:44:50,783] ERROR - prefect.TaskRunner | Task 'stage_0[4686]': unexpected error while running task: RuntimeError('cannot join thread before it is started',)
Traceback (most recent call last):
File "REPODIR/demos/prefect/utilities/executors.py", line 49, in inner
timer.start()
File "CONDADIR/envs/py36.dev/lib/python3.6/threading.py", line 846, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "REPODIR/demos/prefect/engine/task_runner.py", line 290, in run
state, inputs=task_inputs, timeout_handler=executor.timeout_handler
File "REPODIR/demos/prefect/utilities/executors.py", line 53, in inner
timer.join()
File "CONDADIR/envs/py36.dev/lib/python3.6/threading.py", line 1051, in join
raise RuntimeError("cannot join thread before it is started")
RuntimeError: cannot join thread before it is started
[2019-10-03 21:44:50,785] INFO - prefect.TaskRunner | Task 'stage_0[4691]': Starting task run...
[2019-10-03 21:44:50,873] INFO - prefect.TaskRunner | Task 'stage_0[4686]': finished task run for task with final state: 'Failed'
$ ls TMPDIR/0* | wc -l
432
$ ls TMPDIR/1* | wc -l
3897
$ ls TMPDIR/2* | wc -l
3226
$ ls TMPDIR/3* | wc -l
3045
$ ls TMPDIR/4* | wc -l
2925
Stops at different steps with different file numbers each timeChris White
10/03/2019, 10:05 PMalvin goh
12/12/2019, 3:24 PMChris White
12/12/2019, 7:02 PMalvin goh
12/13/2019, 9:51 AM