Darragh
06/30/2020, 1:48 PMJim Crist-Harif
06/30/2020, 1:51 PMDarragh
06/30/2020, 2:14 PMn_workers=10
) so I don’t have any logs other than the line above…
Is there any way to pass config to the DaskExecutor to get more debug or logging out of it?Jim Crist-Harif
06/30/2020, 2:19 PMdebug=True
to DaskExecutor
. A few more questions:
• Are you running inside of any process-management software? Your OS normally won't kill processes that exceed a memory limit, but if you're running inside say a docker container there may be some external process monitoring resource usage. Just trying to determine if the reason the worker died could be an external kill signal.
• Is your flow particularly resource intensive in any way?Darragh
06/30/2020, 2:26 PMJim Crist-Harif
06/30/2020, 2:30 PMDaskExecutor
by passing in cluster_kwargs={"processes": False}
. You might also add debug=True
so if you run into a problem like this again, you'll get more log information that we can work with.LocalDaskExecutor
, which will use an alternative dask scheduler to run your workload (by default this one uses threads). In this case the LocalDaskExecutor
would probably work fine for you, but for most prefect workloads the DaskExecutor
(which uses the distributed scheduler) will be more performant.Darragh
06/30/2020, 2:32 PMJim Crist-Harif
06/30/2020, 3:21 PMDarragh
06/30/2020, 3:33 PMJim Crist-Harif
06/30/2020, 3:37 PMDarragh
06/30/2020, 3:39 PMJim Crist-Harif
06/30/2020, 3:41 PMDaskExecutor
or LocalDaskExecutor
?Darragh
06/30/2020, 3:42 PMJim Crist-Harif
06/30/2020, 3:43 PMdebug=True
passed to DaskExecutor
may be useful here.Darragh
06/30/2020, 3:43 PMflow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=20, cluster_kwargs={"processes": False}, debug=True))
PREFECT___LOGGING___LEVEL=DEBUG
would do it?Jim Crist-Harif
06/30/2020, 4:47 PMflow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=20, cluster_kwargs={"processes": False, "silence_logs": 10}))
PREFECT__CLOUD__AGENT__LEVEL=DEBUG
Darragh
06/30/2020, 5:01 PMJim Crist-Harif
06/30/2020, 5:04 PMLocalDaskExecutor
a try. It's much lighter weight, and fits your problem well.
executor = LocalDaskExecutor(num_workers=10) # defaults to number of cores
Darragh
06/30/2020, 5:09 PMprefect.client.Client
- do they need to be cleaned?
I’ll try switch to the Local version now as well 👍result=LocalResult()
have any implications?Jim Crist-Harif
06/30/2020, 5:19 PMLocalResult
is disk write speed, since it will be writing to disk in your fargate task. Seems unlikely to be the culprit here.Darragh
06/30/2020, 5:25 PMflow.environment = LocalEnvironment(executor=LocalDaskExecutor(n_workers=10))
Jim Crist-Harif
06/30/2020, 5:25 PMn_workers
-> num_workers
Darragh
06/30/2020, 5:25 PMJim Crist-Harif
06/30/2020, 5:26 PMDarragh
06/30/2020, 5:26 PMJim Crist-Harif
06/30/2020, 5:30 PMnum_workers
works fine for me - the following flow runs 10 tasks in true parallel:
import time
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
@task
def inc(x):
time.sleep(10)
return x + 1
with Flow("test") as flow:
inc.map(range(10))
state = flow.run(executor=LocalDaskExecutor(num_workers=10))
num_workers
is definitely the proper keyword argument - if it doesn't work for you, that's odd. Can you try running the above flow on your infrastructure to see if that works as intended?Darragh
06/30/2020, 5:49 PMJim Crist-Harif
06/30/2020, 6:34 PMDarragh
06/30/2020, 7:23 PM