https://prefect.io logo
Title
e

Espen Overbye

07/11/2020, 10:51 AM
Hi, We have a set of workflows that we are in process of prefectifying, but have stumbled on a problem. We are processing weather data, and the preferred python libraries for handling meteorological data (NetCDF) expect files to be accessible on a mounted filesystsem. Our preferred option is to run flows on kubernetes, but it is not clear to us how to mount a volume on the pods that execute the flows. All tips greatly appreciated
c

Chris White

07/12/2020, 11:06 PM
Hi @Espen Overbye - if you are using a backend API then the
KubernetesJobEnvironment
allows you to specify a job spec for the flow which can include mounted volumes: https://docs.prefect.io/api/latest/environments/execution.html#kubernetesjobenvironment
e

Espen Overbye

07/14/2020, 12:35 PM
Hi @Chris White, I'm eager to try out with https://github.com/PrefectHQ/prefect/pull/2950 now merged
ūüöÄ 1
Hi @Chris White -after having tested this a fair bit more, I'm still stuck: The doc is not in sync with what I'm experiencing. Registering the flow works fine, but the job is still executed with the job_spec.yaml that seems to be default on the agent (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/job_spec.yaml) and not the one I've defined. It could be that I misunderstand how agent/kubernetes is related to execution/k8s (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/execution/k8s/job.py).
Having the code/image in a private repo, makes it sort of hard to create a full running example
j

josh

08/10/2020, 7:01 PM
@Espen Overbye could you post an example of your flow/environment and how you are creating it?
e

Espen Overbye

08/10/2020, 7:02 PM
# flows/my_flow.py

from prefect import task, Flow
from prefect.environments.storage import GitHub, Docker
from prefect.environments import LocalEnvironment, KubernetesJobEnvironment
import pathlib

@task
def get_data():
    return [1, 2, 3, 4, 5]

@task
def print_data(data):
    print(data)

with Flow("test") as flow:
    data = get_data()
    print_data(data)

flow.storage = GitHub(
    repo="airmine-ai/prefect",                 # name of repo
    path="flows/my_flow.py",        # location of flow file in repo
    secrets=["GITHUB_ACCESS_TOKEN"],  # name of personal access token secret,
)

flow.environment = KubernetesJobEnvironment(job_spec_file="job.yaml", metadata={"image": "<http://airmineacrprod.azurecr.io/griddr-prefect:2020-08-11|airmineacrprod.azurecr.io/griddr-prefect:2020-08-11>"})
apiVersion: batch/v1
kind: Job
metadata:
  name: prefect-job-UUID
  labels:
    app: prefect-job-UUID
    identifier: UUID
spec:
  template:
    metadata:
      labels:
        app: prefect-job-UUID
        identifier: UUID
    spec:
      containers:
        - name: flow
          image: <http://airmineacrprod.azurecr.io/griddr-prefect:2020-08-10|airmineacrprod.azurecr.io/griddr-prefect:2020-08-10>
          imagePullPolicy: IfNotPresent
      restartPolicy: Never
      volumeMounts:
        - name: azure
          mountPath: /mnt/data
    volumes:
      - name: azure
        azureFile:
          secretName: griddeddata
          shareName: gridded-data
          readOnly: false
Agent deployed with prefect agent install
cli, and griddr-prefect updated to prefect 0.13.1
j

josh

08/10/2020, 7:06 PM
In your image
"<http://airmineacrprod.azurecr.io/griddr-prefect:2020-08-11|airmineacrprod.azurecr.io/griddr-prefect:2020-08-11>"
is there a file called
job.yaml
? Generally with storage options in the past the job spec file is loaded at registration time and shipped with the flow pickle to wherever you store it. However I’m not 100% sure how this plays with file-based storage (like github) because the flow is not initialized until it goes to run to which it requires that the job spec yaml is present there
e

Espen Overbye

08/10/2020, 7:09 PM
No, my understanding was that https://github.com/PrefectHQ/prefect/pull/2950 was solving that issue, in other words making sure the job_spec file was pulled from github?
and from the pod-job logs it seems like prefect is looking for job_spec.yaml, though I specified job.yaml:
PS C:\Users\espen\source\repos\airmine-ai\griddr> kc logs prefect-job-7fa422e0-5tvkq [Errno 2] No such file or directory: '/job_spec.yaml' Traceback (most recent call last): File "/usr/local/bin/prefect", line 8, in <module> sys.exit(cli()) File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 829, in call return self.main(*args, **kwargs) File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/usr/local/lib/python3.8/dist-packages/prefect/cli/execute.py", line 46, in cloud_flow return _execute_flow_run() File "/usr/local/lib/python3.8/dist-packages/prefect/cli/execute.py", line 93, in _execute_flow_run raise exc File "/usr/local/lib/python3.8/dist-packages/prefect/cli/execute.py", line 83, in _execute_flow_run flow = storage.get_flow(storage.flows[flow_data.name]) File "/usr/local/lib/python3.8/dist-packages/prefect/environments/storage/github.py", line 89, in get_flow return extract_flow_from_file(file_contents=decoded_contents) File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/storage.py", line 74, in extract_flow_from_file exec(contents, exec_vals) File "<string>", line 25, in <module> File "/usr/local/lib/python3.8/dist-packages/prefect/environments/execution/k8s/job.py", line 87, in init self._job_spec = self._load_spec_from_file() File "/usr/local/lib/python3.8/dist-packages/prefect/environments/execution/k8s/job.py", line 337, in _load_spec_from_file with open(self.job_spec_file) as job_spec_file: FileNotFoundError: [Errno 2] No such file or directory: '/job_spec.yaml'
j

josh

08/10/2020, 7:16 PM
Hmm is there a chance you haven’t updated the flow file in github and it still says
job_spec.yaml
? That error is showing it initializing your environment and attempting to load the yaml file it has listed under
self.job_spec_file
and that value is initialized as:
self.job_spec_file = os.path.abspath(job_spec_file) if job_spec_file
So
job_spec.yaml
is being passed into the environment somewhere
e

Espen Overbye

08/10/2020, 7:29 PM
Maybe the problem is that I have assumed that the kubernetes agent unpickles the flow and the job_spec, but it could be that there is an extra step here: Agent reads the flow spec, but does not read the flow from github Instead it runs the job at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/job_spec.yaml (This is inline with the job definition I see created in Kubernetes) This will run a new pod pulling the flow spec from github Because I'm running a custom image, without the job_spec.yaml (or ideally job.yaml), this step fails
The kubernetes agent and the kubernetesjobenvironment is confusing
j

josh

08/10/2020, 7:33 PM
Due to the separation of user data and flow metadata there is an intermediate step happening here. The agent creates an initial prefect-job where the flow‚Äôs storage is used to grab the flow from github. Then the flow is loaded up and the KubernetesJobEnvironment is executed to create the job with your custom specs. Yeah this pattern is confusing and that‚Äôs why we are planning to deprecate it in the future for something much cleaner ūüôā https://github.com/PrefectHQ/prefect/issues/2928
e

Espen Overbye

08/10/2020, 7:39 PM
That sounds good, any idea on timeline for this? Might fall back to skipping github and using docker as storage for now
And thanks for helping out!