Thread
#prefect-community
    d

    Dnyaneshwar

    2 years ago
    Hello, I am running
    DaskExecutor
    with
    YarnCluster
    . This gives me
    KilledWorker
    error. I am not able to log more data as
    debug=True
    option also doesn't add more information. However, when I try the same tasks on
    DaskExecutor
    with
    address=None
    , I do not get any error. What am I missing?
    nicholas

    nicholas

    2 years ago
    Hi @Dnyaneshwar - can you provide some more info on your setup and maybe a min reproducible example? When you say the DaskExecutor without an address has no errors, does that mean your flow executes as expected? Or nothing is output?
    The
    KilledWorker
    sounds like something misconfigured on the Hadoop end
    d

    Dnyaneshwar

    2 years ago
    When
    address=None
    the flow executes as expected without any errors or warnings. (I really liked the way logs were structured, so I could actually look how each worker performed. Thanks 🙂) When I use the same YarnCluster in python (without any flow), it runs as expected. However, before even the tasks are mapped, I get the
    KilledWorker
    error.
    nicholas

    nicholas

    2 years ago
    Glad to hear you like the logs! Ok, since I'm not super familiar with Yarn let me poll the team to see if I can get some info on that @Dnyaneshwar 🙂
    d

    Dnyaneshwar

    2 years ago
    I checked some logs and it seems the error is coming in
    self.client.gather()
    function with the argument
    asynchronous
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hi @Dnyaneshwar, are you able to successfully use
    YarnCluster
    outside of prefect? You might try:
    from dask_yarn import YarnCluster
    from dask.distributed import Client
    
    cluster = YarnCluster(...)  # Create a cluster, with whatever configuration you want
    cluster.scale(1)  # Scale to one worker
    client = Client(cluster)
    client.submit(lambda x: x + 1, 1).result()  # Should return 2
    I am not able to log more data as
    debug=True
    option also doesn't add more information.
    The
    debug
    option only applies when running with a local cluster (which happens if
    address=None
    ). It would be helpful to get the logs from the failed cluster. You can do this with:
    yarn logs -applicationId <your cluster application id>
    d

    Dnyaneshwar

    2 years ago
    Hi @Jim Crist-Harif, I am able to use YarnCluster successfully outside the prefect. I am using
    client.map()
    and
    as_completed()
    from dask to map and reduce the tasks. It is working without any error. Whenever I use the
    YarnCluster
    inside Prefect, I am getting KilledWorker error.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Can you report the logs from the failed cluster when used with prefect? Without knowing why your worker is being killed it's hard for us to debug. My first guess is excess memory usage, but there may be another reason.
    d

    Dnyaneshwar

    2 years ago
    It seems that this particular error is getting resolved after I add more memory to the nodes. However, I am now getting new error.
    ERROR:prefect.FlowRunner:Unexpected error: TypeError("can't pickle _mysql_connector.MySQL objects",)
    . This error I am getting for both
    DaskExecutor
    with
    address=None
    and
    YarnCluster.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    When running with dask, the outputs of your tasks may be pickled between worker nodes. If you want to use multiple processes, you need to ensure that the output of your task is pickleable. It looks like you're returning a
    MySQL
    object from one task, probably as a database connection used by other tasks? One option would be to recreate the connection in every task that needs it, and close the connection after use. We're still working on good patterns for these use cases.