Lucas Beck
09/14/2021, 8:20 AMETL
like jobs to run, some on schedule and some on demand. We usually break down big computing tasks in smaller parts, each contained in its own docker
container. These tasks, when combined, form a computation DAG. This DAG can mostly be parallelized (mapped). We then leverage an Azure Kubernetes Service (AKS) cluster to horizontally scale the compute jobs into hundrends, sometimes thousands of containers. So far we have been using our own internal tool to orchestrate tasks and manage dependencies, and that is what we are currently investigating if we can replace with prefect. Our prefect setup has a self hosted server instead being the cloud solution.
Challenge 1
The first and most important challenge we are facing regards scaling jobs. I will use the flow we tried to migrate to prefect as the example here. I am attaching the DAG schematic as a picture. In this flow, each task use prefect's RunNamespacedJob
to spin up a new job that has exactly one pod, which will perform a given computation. We then use other functions from the prefect SDK to read the logs after completion and delete the kubernetes jobs. The whole DAG can be run in parallel (mapped) for multiple inputs. Usually the job works fine for 3-5 inputs, but as soon as we try to scale to as little as 30 inputs we start seeing a ton of heartbeat errors or no pod statuses (also attaching an image here) . Once running with our internal tool, we already scaled up to around 500 inputs in parallel in the same cluster setup. Anyone else has experienced this? By the way, we are using the DaskExecutor
for that with default parameters
Challenge 2
We currently have a single repo where we want to keep all prefect flows under version control. In order to reuse code, we also made it a package that can be imported around. The structure looks like this:
prefect-flows/
setup.py
requirements.txt
prefectflows/
flows/
flow1.py
flow2.py
...
core/
kubernetes.py
...
The idea is that the actual flows sit under the flow
folder and can import boiler plate/ utils from the core
folder. For instance, under kubernetes.py
are all functions that use prefect's SDK to interact with kubernetes. An example function is run_job
which will run a job, read logs and eventually delete that job.
This all works fine if running locally, but fails once registering a flow and trying to run it through the server. The error is :
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'prefectflows.core\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
It is important to mention that I have dockerized the package using the prefect base image, and I am telling prefect to run the flow with that image. Something like the following:
Flow(
flow_name,
storage=Azure(
container=container,
stored_as_script=False,
connection_string=azure_connection_string,
),
run_config=KubernetesRun(
image="imagewithourpackageinstalled:latest",
...
Maybe I misunderstood where the unpickling is happening, but would expect that if the flow uses an image that has all packages used when registering that flow, then that should work.
I think that summarizes it, but please let us know if you need further description or to share more of the code.
Thanks in advance for your help!Lucas Beck
09/14/2021, 9:39 AMchallenge 1
, it seems that the prefect-job
pod just dies after a while. I am trying to dig why nowLucas Beck
09/14/2021, 11:18 AM[2021-09-14 11:14:53+0000] INFO - prefect.CloudTaskRunner | Task 'download_s2[6]': Finished task run for task with final state: 'Success'
[2021-09-14 11:14:53+0000] INFO - prefect.RunNamespacedJob | Job 59288319-c4b5-4ae6-b1a7-c9cae8c3c1e8 has been completed.
lubc@LUBC-PC1:~$ kubectlprefect logs --tail 50 -f prefect-job-319c1d3f-6jkxs
See the image for the UIdavzucky
09/14/2021, 12:50 PMKevin Kho
Lucas Beck
09/15/2021, 7:19 AMFor the pod dying. Did you check the pod resources limit. You need to be sure you have enough memory availableI will double check that, but we are not actually doing any computation in the job pod, just the orchestration. All the computation happens in the pods we spin up.
Hey Lucas, for the packages, Prefect will get the flow from Azure storage and run it on top of the container. That said I am wondering if you installed it as a module in the Docker container or did you just copy the files over? You have a setup.py, so it seems like you may have installed the libraries?Yes, that was my understanding as well and that is why I do not get why it does not work. I do install the package, see the
DockerFile
below:
FROM prefecthq/prefect:0.15.4-python3.8
WORKDIR /app
COPY requirements.txt /app/requirements.txt
COPY prefectflows /app/prefectflows
COPY setup.py /app/setup.py
RUN pip install -r requirements.txt
RUN pip install .
If I run the image locally with python as an entry point I can import the package normally. I can also run the flow with the package installed locally.
Another interesting fact here is that if I only use the package within the Flow
context, than all is good. The issue appears once I try to use the package within a task
. This gets me thinking that it could be something related to the parallelization going wrong here? I am not really familiar on how Dask
implements that under the hood. I might give it a try with the LocalExecutor
to see if I get a different outcomeLucas Beck
09/15/2021, 7:55 AMLocalExecutor
did not solve the problem 🤷♂️Lucas Beck
09/15/2021, 9:30 AMLucas Beck
09/15/2021, 9:31 AMKubernetesRun
allocates to the podLucas Beck
09/15/2021, 10:04 AMLucas Beck
09/15/2021, 12:22 PMLocalDaskExecutor
did the trick for challenge 1 🎉
Challenge 2 still remainsKevin Kho
python setup.py bdist_wheel
? And then this will create the wheel. You can go into the wheel and see what files make it in there. This would be the files that are installed with the package. If you’re maybe missing an ___init___.py
somewhere, there is a chance it won’t get used.Kevin Kho
Lucas Beck
09/16/2021, 8:56 AM__init__.py
file. You were absolutely right @Kevin Kho. Thank you
I am puzzled on why this fixed it though. Before I had this:
mypackage/
...
core/
kubernetes.py
flow.py
...
And the imports worked for flow.py
within a Flow
context, but did not work for kubernetes.py
within a task context. So, what I mean is this:
...
import mypackage.core.kubernetes as kub
import mypackage.core.flow as fl
...
@task
def my_task():
kub.dosmth() # this did not work without the __init__.py
...
with Flow(...) as flow:
fl.do_smth() # this worked fine without the __init__.py
Why adding the init fixed this? Is it related to the dask workers?Kevin Kho
Lucas Beck
09/16/2021, 2:04 PMWhen you mean not work, do you mean during runtime?Yea, registration works fine, but I get the import error/module not found thingy during flow execution
Lucas Beck
09/16/2021, 2:04 PMKevin Kho
Lucas Beck
09/16/2021, 2:08 PMLucas Beck
09/16/2021, 2:09 PMKevin Kho