Hi Everyone, My colleagues and I recently started...
# ask-community
l
Hi Everyone, My colleagues and I recently started migrating our internal tool to Prefect, in order to see if it could support our use cases. So far we have had a generally good experience, specially with the simpler use cases. Recently though, we reached a few walls that have been hard to overcome. We have tried to go through the documentation and search this slack channel without much luck, so that is why I am now writing here to see if we can get some help. Let me quickly describe our overall setup followed by the challenges/blockers to see if someone has any suggestions. Please do share your thoughts also in the way organize things, perhaps a different way of organization could help. Our setup and use cases We have various different
ETL
like jobs to run, some on schedule and some on demand. We usually break down big computing tasks in smaller parts, each contained in its own
docker
container. These tasks, when combined, form a computation DAG. This DAG can mostly be parallelized (mapped). We then leverage an Azure Kubernetes Service (AKS) cluster to horizontally scale the compute jobs into hundrends, sometimes thousands of containers. So far we have been using our own internal tool to orchestrate tasks and manage dependencies, and that is what we are currently investigating if we can replace with prefect. Our prefect setup has a self hosted server instead being the cloud solution. Challenge 1 The first and most important challenge we are facing regards scaling jobs. I will use the flow we tried to migrate to prefect as the example here. I am attaching the DAG schematic as a picture. In this flow, each task use prefect's
RunNamespacedJob
to spin up a new job that has exactly one pod, which will perform a given computation. We then use other functions from the prefect SDK to read the logs after completion and delete the kubernetes jobs. The whole DAG can be run in parallel (mapped) for multiple inputs. Usually the job works fine for 3-5 inputs, but as soon as we try to scale to as little as 30 inputs we start seeing a ton of heartbeat errors or no pod statuses (also attaching an image here) . Once running with our internal tool, we already scaled up to around 500 inputs in parallel in the same cluster setup. Anyone else has experienced this? By the way, we are using the
DaskExecutor
for that with default parameters Challenge 2 We currently have a single repo where we want to keep all prefect flows under version control. In order to reuse code, we also made it a package that can be imported around. The structure looks like this:
Copy code
prefect-flows/
  setup.py
  requirements.txt
  prefectflows/
    flows/
      flow1.py
      flow2.py
      ...
    core/
      kubernetes.py
      ...
The idea is that the actual flows sit under the
flow
folder and can import boiler plate/ utils from the
core
folder. For instance, under
kubernetes.py
are all functions that use prefect's SDK to interact with kubernetes. An example function is
run_job
which will run a job, read logs and eventually delete that job. This all works fine if running locally, but fails once registering a flow and trying to run it through the server. The error is :
Copy code
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'prefectflows.core\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
It is important to mention that I have dockerized the package using the prefect base image, and I am telling prefect to run the flow with that image. Something like the following:
Copy code
Flow(
        flow_name,
        storage=Azure(
            container=container,
            stored_as_script=False,
            connection_string=azure_connection_string,
        ),
        run_config=KubernetesRun(
            image="imagewithourpackageinstalled:latest",
...
Maybe I misunderstood where the unpickling is happening, but would expect that if the flow uses an image that has all packages used when registering that flow, then that should work. I think that summarizes it, but please let us know if you need further description or to share more of the code. Thanks in advance for your help!
Quick update here. After debugging a bit more
challenge 1
, it seems that the
prefect-job
pod just dies after a while. I am trying to dig why now
Yea, hard to say. In the middle of the run, without any descriptive logs the prefect job pod just stops. For instance: Logs from the job POD:
Copy code
[2021-09-14 11:14:53+0000] INFO - prefect.CloudTaskRunner | Task 'download_s2[6]': Finished task run for task with final state: 'Success'
[2021-09-14 11:14:53+0000] INFO - prefect.RunNamespacedJob | Job 59288319-c4b5-4ae6-b1a7-c9cae8c3c1e8 has been completed.
lubc@LUBC-PC1:~$ kubectlprefect logs --tail 50 -f prefect-job-319c1d3f-6jkxs
See the image for the UI
d
For the pod dying. Did you check the pod resources limit. You need to be sure you have enough memory available
k
Hey Lucas, for the packages, Prefect will get the flow from Azure storage and run it on top of the container. That said I am wondering if you installed it as a module in the Docker container or did you just copy the files over? You have a setup.py, so it seems like you may have installed the libraries?
l
Hey @davzucky and @Kevin Kho, Thanks for your answers.
For the pod dying. Did you check the pod resources limit. You need to be sure you have enough memory available
I will double check that, but we are not actually doing any computation in the job pod, just the orchestration. All the computation happens in the pods we spin up.
Hey Lucas, for the packages, Prefect will get the flow from Azure storage and run it on top of the container. That said I am wondering if you installed it as a module in the Docker container or did you just copy the files over? You have a setup.py, so it seems like you may have installed the libraries?
Yes, that was my understanding as well and that is why I do not get why it does not work. I do install the package, see the
DockerFile
below:
Copy code
FROM prefecthq/prefect:0.15.4-python3.8

WORKDIR /app

COPY requirements.txt /app/requirements.txt
COPY prefectflows /app/prefectflows
COPY setup.py /app/setup.py

RUN pip install -r requirements.txt
RUN pip install .
If I run the image locally with python as an entry point I can import the package normally. I can also run the flow with the package installed locally. Another interesting fact here is that if I only use the package within the
Flow
context, than all is good. The issue appears once I try to use the package within a
task
. This gets me thinking that it could be something related to the parallelization going wrong here? I am not really familiar on how
Dask
implements that under the hood. I might give it a try with the
LocalExecutor
to see if I get a different outcome
🙌 1
Quick update here @Kevin Kho, using the
LocalExecutor
did not solve the problem 🤷‍♂️
@davzucky, that might have been the issue. I can see the LocalDask cluster seem to use quite some resources although I did not expect it
I cannot find what is the default memory that
KubernetesRun
allocates to the pod
I think the scaling might be related to this open issue: https://github.com/PrefectHQ/prefect/issues/3966
Switching to the
LocalDaskExecutor
did the trick for challenge 1 🎉 Challenge 2 still remains
k
Can you try doing
python setup.py bdist_wheel
? And then this will create the wheel. You can go into the wheel and see what files make it in there. This would be the files that are installed with the package. If you’re maybe missing an
___init___.py
somewhere, there is a chance it won’t get used.
Also I think the default for a k8s job is like 2gb
l
Ohhhh that missing
__init__.py
file. You were absolutely right @Kevin Kho. Thank you I am puzzled on why this fixed it though. Before I had this:
Copy code
mypackage/
...
  core/
    kubernetes.py
    flow.py
...
And the imports worked for
flow.py
within a
Flow
context, but did not work for
kubernetes.py
within a task context. So, what I mean is this:
Copy code
...
import mypackage.core.kubernetes as kub
import mypackage.core.flow as fl

...
@task
def my_task():
  kub.dosmth() # this did not work without the __init__.py

...

with Flow(...) as flow:
  fl.do_smth() # this worked fine without the __init__.py
Why adding the init fixed this? Is it related to the dask workers?
k
When you mean not work, do you mean during runtime? So when you register, tasks are not run, but the stuff in the Flow block is to evaluate the Flow and store it. That is the main difference I see so far. But that is still unexpected because if one works, I expect the other to.
l
When you mean not work, do you mean during runtime?
Yea, registration works fine, but I get the import error/module not found thingy during flow execution
Well, anyways. I am happy it works now. Thanks for your help!
k
Ah that just means the place of registration is able to access these paths but the k8s pod couldn’t right?
l
Yea, but it does not make sense as I could import all paths from the docker image the k8s pod was using
I still suspect it was smth weird going on with the DaskExecutor, but don't really know
k
Ah gotcha. I think this is because the Dask workers just need their own installation of the libraries and it wasn’t being installed correctly through the image? I assume you are using KubeCluster? You can probably import them through the regular path on the Docker image even if it’s not installed (just because of where the script is run from).