Hi All We are using prefect server 0.15.7 and run...
# prefect-community
m
Hi All We are using prefect server 0.15.7 and running the flows on yarn Cluster. I have 20 mapped tasks and each mapped task loops over a list , does some processing and saves output to hadoop. I am getting the below error intermittently. Any help would be appreciated.
a
Are you running Dask on a Hadoop cluster? The error says something about unmanaged history and it seems like a Dask issue. Can you share the flow code for this flow (mainly interested in flow structure and configuration of Flow object, not what tasks are doing)? What agent did you use?
k
The KilledWorker indicates you have some kind of package mismatch between Client/Scheduler/Workers
upvote 2
m
@Marvin I am trying to run a mapped task and use import some other common tasks in utils folder, I am unable to run the mapped task and it gives me Crash details: Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/prefect_dask/task_runners.py", line 298, in wait return await future.result(timeout=timeout) File "/usr/local/lib/python3.8/dist-packages/distributed/client.py", line 299, in _result raise exc.with_traceback(tb) File "/usr/local/lib/python3.8/dist-packages/distributed/protocol/pickle.py", line 96, in loads return pickle.loads(x) ModuleNotFoundError: No module named 'utils'
m
It appears you have a
ModuleNotFoundError
, indicating that the module named 'utils' which you're attempting to import in your Prefect flow cannot be found. When you run Prefect tasks in a distributed environment, such as with a Dask executor, all required modules must be installed in the environment where the workers are running. To troubleshoot this issue, make sure of the following: 1. The
utils
module is actually a Python module with an
__init__.py
file in the correct location where Python expects it to be. Confirm that it's either part of your Python environment's site-packages or it's a part of the PYTHONPATH. 2. If
utils
is a custom module from your own project, ensure it gets distributed to your Dask workers. This often involves packaging your code in a way that it can be transferred to the workers, such as using a Docker image that contains your code and is used by all workers. 3. If you are using local imports, make sure that the execution environment of the worker has access to the same filesystem structure. With Dask, this typically means ensuring that the same codebase is present on all nodes of the cluster. 4. Ensure you are using the correct import statements. If
utils
is a directory with multiple Python files, you may need to be more specific with your imports (e.g.,
from utils import my_module
). 5. If you are running Dask on a cluster, you may need to install your
utils
module across all Dask workers. This can be done by ensuring the
utils
module is present in the worker's environment or by using Dask's ability to upload the module. It could be useful to run a simple debugging task that imports
utils
and logs some output before running your main tasks, to ensure that the import can be performed without error. If the
utils
folder is part of your project, you might want to package your project and install it in your Dask workers' environments or make sure the PYTHONPATH includes the path to your project in the workers' environments. If these steps don't solve your problem, please provide more context or code examples so that the advice can be more specific to your situation. If you have a
requirements.txt
file, make sure it includes all the necessary modules, and that this file is used to set up the environment on your Dask workers.
m
@Marvin All other tasks except mapped task has access to utils, it fails only in a mapped task
m
The documentation excerpts from the parallel tool use do not provide specific information regarding the issue of "ModuleNotFoundError" for mapped tasks. The absence of related GitHub issues indicates that this may not be a widely reported problem, or it may be specific to your setup. However, given that only your mapped tasks are unable to import the
utils
module, you may want to consider the following approaches: 1. Ensure that your
utils
module is in a location where all workers can access it, such as in distributed storage or packaged within the Docker image if you're using Docker for your Dask workers. 2. Review how you're packaging and deploying your flow code. Make sure that the
utils
module is included in any packages or ZIP files that you're deploying. 3. Check the import statements within the mapped task. If there is any difference in how you're referring to
utils
compared to your other tasks, that might be an issue. 4. Verify that the environment variables, such as
PYTHONPATH
, are set consistently across all tasks and workers to ensure that Python can locate your
utils
module. 5. It's also possible that this could be an issue with Dask worker initialization. When a Dask worker is spawned, it may not be importing the
utils
module correctly due to the environment it's executing in. Could you provide more details about your setup? For instance, how you are deploying your flow and the Dask cluster, the structure of your project, and how you are importing the
utils
module? With more details, I might be able to give a more precise solution. For the time being, if none of the above steps help and this seems like a more complex issue, you might want to consider reaching out to the Prefect community or filing an issue on the Prefect GitHub repository for further assistance.