Darragh
06/30/2020, 1:48 PMJim Crist-Harif
06/30/2020, 1:51 PMDarragh
06/30/2020, 2:14 PMn_workers=10) so I don’t have any logs other than the line above…
Is there any way to pass config to the DaskExecutor to get more debug or logging out of it?Jim Crist-Harif
06/30/2020, 2:19 PMdebug=True to DaskExecutor. A few more questions:
• Are you running inside of any process-management software? Your OS normally won't kill processes that exceed a memory limit, but if you're running inside say a docker container there may be some external process monitoring resource usage. Just trying to determine if the reason the worker died could be an external kill signal.
• Is your flow particularly resource intensive in any way?Darragh
06/30/2020, 2:26 PMJim Crist-Harif
06/30/2020, 2:30 PMDaskExecutor by passing in cluster_kwargs={"processes": False}. You might also add debug=True so if you run into a problem like this again, you'll get more log information that we can work with.Jim Crist-Harif
06/30/2020, 2:32 PMLocalDaskExecutor, which will use an alternative dask scheduler to run your workload (by default this one uses threads). In this case the LocalDaskExecutor would probably work fine for you, but for most prefect workloads the DaskExecutor (which uses the distributed scheduler) will be more performant.Darragh
06/30/2020, 2:32 PMDarragh
06/30/2020, 3:13 PMJim Crist-Harif
06/30/2020, 3:21 PMDarragh
06/30/2020, 3:33 PMJim Crist-Harif
06/30/2020, 3:37 PMJim Crist-Harif
06/30/2020, 3:39 PMDarragh
06/30/2020, 3:39 PMJim Crist-Harif
06/30/2020, 3:41 PMDaskExecutor or LocalDaskExecutor?Darragh
06/30/2020, 3:42 PMJim Crist-Harif
06/30/2020, 3:43 PMdebug=True passed to DaskExecutor may be useful here.Darragh
06/30/2020, 3:43 PMDarragh
06/30/2020, 4:40 PMflow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=20, cluster_kwargs={"processes": False}, debug=True))Darragh
06/30/2020, 4:44 PMPREFECT___LOGGING___LEVEL=DEBUG would do it?Jim Crist-Harif
06/30/2020, 4:47 PMflow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=20, cluster_kwargs={"processes": False, "silence_logs": 10}))Jim Crist-Harif
06/30/2020, 4:47 PMJim Crist-Harif
06/30/2020, 4:49 PMPREFECT__CLOUD__AGENT__LEVEL=DEBUGDarragh
06/30/2020, 5:01 PMDarragh
06/30/2020, 5:02 PMDarragh
06/30/2020, 5:03 PMJim Crist-Harif
06/30/2020, 5:04 PMJim Crist-Harif
06/30/2020, 5:06 PMLocalDaskExecutor a try. It's much lighter weight, and fits your problem well.
executor = LocalDaskExecutor(num_workers=10) # defaults to number of coresDarragh
06/30/2020, 5:09 PMprefect.client.Client - do they need to be cleaned?
I’ll try switch to the Local version now as well 👍Darragh
06/30/2020, 5:10 PMresult=LocalResult() have any implications?Jim Crist-Harif
06/30/2020, 5:19 PMJim Crist-Harif
06/30/2020, 5:20 PMLocalResult is disk write speed, since it will be writing to disk in your fargate task. Seems unlikely to be the culprit here.Jim Crist-Harif
06/30/2020, 5:21 PMDarragh
06/30/2020, 5:25 PMDarragh
06/30/2020, 5:25 PMflow.environment = LocalEnvironment(executor=LocalDaskExecutor(n_workers=10))Jim Crist-Harif
06/30/2020, 5:25 PMn_workers -> num_workersDarragh
06/30/2020, 5:25 PMJim Crist-Harif
06/30/2020, 5:26 PMDarragh
06/30/2020, 5:26 PMDarragh
06/30/2020, 5:27 PMJim Crist-Harif
06/30/2020, 5:30 PMnum_workers works fine for me - the following flow runs 10 tasks in true parallel:
import time
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
@task
def inc(x):
time.sleep(10)
return x + 1
with Flow("test") as flow:
inc.map(range(10))
state = flow.run(executor=LocalDaskExecutor(num_workers=10))Jim Crist-Harif
06/30/2020, 5:39 PMnum_workers is definitely the proper keyword argument - if it doesn't work for you, that's odd. Can you try running the above flow on your infrastructure to see if that works as intended?Darragh
06/30/2020, 5:49 PMDarragh
06/30/2020, 6:32 PMJim Crist-Harif
06/30/2020, 6:34 PMDarragh
06/30/2020, 7:23 PM