Hello, I am running `DaskExecutor` with `YarnClus...
# prefect-community
d
Hello, I am running
DaskExecutor
with
YarnCluster
. This gives me
KilledWorker
error. I am not able to log more data as
debug=True
option also doesn't add more information. However, when I try the same tasks on
DaskExecutor
with
address=None
, I do not get any error. What am I missing?
n
Hi @Dnyaneshwar - can you provide some more info on your setup and maybe a min reproducible example? When you say the DaskExecutor without an address has no errors, does that mean your flow executes as expected? Or nothing is output?
The
KilledWorker
sounds like something misconfigured on the Hadoop end
d
When
address=None
the flow executes as expected without any errors or warnings. (I really liked the way logs were structured, so I could actually look how each worker performed. Thanks 🙂) When I use the same YarnCluster in python (without any flow), it runs as expected. However, before even the tasks are mapped, I get the
KilledWorker
error.
n
Glad to hear you like the logs! Ok, since I'm not super familiar with Yarn let me poll the team to see if I can get some info on that @Dnyaneshwar 🙂
d
I checked some logs and it seems the error is coming in
self.client.gather()
function with the argument
asynchronous
🤔 1
j
Hi @Dnyaneshwar, are you able to successfully use
YarnCluster
outside of prefect? You might try:
Copy code
from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(...)  # Create a cluster, with whatever configuration you want
cluster.scale(1)  # Scale to one worker
client = Client(cluster)
client.submit(lambda x: x + 1, 1).result()  # Should return 2
I am not able to log more data as
debug=True
option also doesn't add more information.
The
debug
option only applies when running with a local cluster (which happens if
address=None
). It would be helpful to get the logs from the failed cluster. You can do this with:
Copy code
yarn logs -applicationId <your cluster application id>
d
Hi @Jim Crist-Harif, I am able to use YarnCluster successfully outside the prefect. I am using
client.map()
and
as_completed()
from dask to map and reduce the tasks. It is working without any error. Whenever I use the
YarnCluster
inside Prefect, I am getting KilledWorker error.
j
Can you report the logs from the failed cluster when used with prefect? Without knowing why your worker is being killed it's hard for us to debug. My first guess is excess memory usage, but there may be another reason.
d
It seems that this particular error is getting resolved after I add more memory to the nodes. However, I am now getting new error.
ERROR:prefect.FlowRunner:Unexpected error: TypeError("can't pickle _mysql_connector.MySQL objects",)
. This error I am getting for both
DaskExecutor
with
address=None
and
YarnCluster.
j
When running with dask, the outputs of your tasks may be pickled between worker nodes. If you want to use multiple processes, you need to ensure that the output of your task is pickleable. It looks like you're returning a
MySQL
object from one task, probably as a database connection used by other tasks? One option would be to recreate the connection in every task that needs it, and close the connection after use. We're still working on good patterns for these use cases.