<@ULVA73B9P> Using a prefect.yaml file for creatin...
# ask-marvin
r
@Marvin Using a prefect.yaml file for creating deployments like so: # Generic metadata about this project name: flows prefect-version: 3.0.0 # build section allows you to manage and build docker images build: - prefect_docker.deployments.steps.build_docker_image: id: build-image requires: prefect-docker>=0.4.0 image_name: "{{ $PREFECT_IMAGE_NAME }}" tag: latest dockerfile: auto platform: "linux/amd64" # push section allows you to manage if and how this project is uploaded to remote locations push: - prefect_docker.deployments.steps.push_docker_image: requires: prefect-docker>=0.4.0 image_name: "{{ build-image.image_name }}" tag: "{{ build-image.tag }}" # pull section allows you to provide instructions for cloning this project in remote locations pull: - prefect.deployments.steps.set_working_directory: directory: /opt/prefect/flows # the definitions section allows you to define reusable components for your deployments definitions: tags: &common_tags - "eks" work_pool: &common_work_pool name: "kubernetes" job_variables: image: "{{ build-image.image }}" # the deployments section allows you to provide configuration for deploying flows deployments: - name: "default" tags: *common_tags schedule: null entrypoint: "flows/hello.py:hello" work_pool: *common_work_pool - name: "arthur" tags: *common_tags schedule: null entrypoint: "flows/hello.py:hello" parameters: name: "Arthur" work_pool: *common_work_pool How do i specify deployment specific cpu and memory resources?
m
thought for 8.2 seconds
In a Prefect 3.x deployment using a Kubernetes work pool, you can specify CPU and memory resources at the deployment level by adding them to the
job_variables
section. Here's how to modify your configuration:
Copy code
yaml
definitions:
  tags: &common_tags
    - "eks"
  work_pool: &common_work_pool
    name: "kubernetes"
    job_variables:
      image: "{{ build-image.image }}"

deployments:
- name: "default"
  tags: *common_tags
  schedule: null
  entrypoint: "flows/hello.py:hello"
  work_pool:
    <<: *common_work_pool
    job_variables:
      image: "{{ build-image.image }}"
      resource_requests:
        cpu: "500m"
        memory: "512Mi"
      resource_limits:
        cpu: "1000m"
        memory: "1Gi"

- name: "arthur"
  tags: *common_tags
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Arthur"
  work_pool:
    <<: *common_work_pool
    job_variables:
      image: "{{ build-image.image }}"
      resource_requests:
        cpu: "1000m"
        memory: "1Gi"
      resource_limits:
        cpu: "2000m"
        memory: "2Gi"
A few key points: - Use
<<: *common_work_pool
to merge the common work pool configuration - Override
job_variables
at the deployment level - Specify both
resource_requests
and `resource_limits`: -
requests
is what the pod is guaranteed to get -
limits
is the maximum the pod can use - CPU is specified in millicores (e.g., "500m" = 0.5 CPU cores) - Memory can use suffixes like Mi (mebibytes) or Gi (gibibytes) This configuration allows you to have different resource allocations for each deployment while maintaining the common work pool configuration for other settings.
r
@Marvin - How do i specify which docker registry the docker image should be pushed to? If i make the deployment directly in python i have to specify the full image name (with docker registry) - how do i do this here?
m
thought for 8.9 seconds
You can specify the Docker registry by including it in the
image_name
in your
prefect.yaml
. Here's how to modify the configuration:
Copy code
yaml
# build section allows you to manage and build docker images
build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "<http://registry.example.com/my-org/{{|registry.example.com/my-org/{{> $PREFECT_IMAGE_NAME }}"  # Include registry here
    tag: latest
    dockerfile: auto
    platform: "linux/amd64"

# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"  # This will include the registry
    tag: "{{ build-image.tag }}"

definitions:
  tags: &common_tags
    - "eks"
  work_pool: &common_work_pool
    name: "kubernetes"
    job_variables:
      image: "{{ build-image.image }}"  # This will reference the full image path including registry
You have several options for specifying the registry: 1. Directly in the yaml file:
Copy code
yaml
image_name: "<http://registry.example.com/my-org/{{|registry.example.com/my-org/{{> $PREFECT_IMAGE_NAME }}"
2. Using environment variables:
Copy code
yaml
image_name: "{{ $DOCKER_REGISTRY }}/{{ $PREFECT_IMAGE_NAME }}"
3. For private registries that require authentication, you'll need to ensure you're logged in to the registry before running
prefect deploy
. You can do this with:
Copy code
bash
docker login <http://registry.example.com|registry.example.com>
Common registry formats: - Docker Hub:
<http://docker.io/username/image-name|docker.io/username/image-name>
- Google Container Registry (GCR):
<http://gcr.io/project-id/image-name|gcr.io/project-id/image-name>
- Amazon Elastic Container Registry (ECR):
<http://account-id.dkr.ecr.region.amazonaws.com/image-name|account-id.dkr.ecr.region.amazonaws.com/image-name>
- Azure Container Registry (ACR):
<http://registryname.azurecr.io/image-name|registryname.azurecr.io/image-name>
The image reference will flow through from the build step to the push step and finally to your work pool configuration, ensuring consistency throughout the deployment process.
r
@Marvin - What if i wanted to specify more than one tag?
m
thought for 8.5 seconds
You can specify multiple tags for your Docker image in the
prefect.yaml
file by modifying the build and push sections. Here's how:
Copy code
yaml
build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "<http://registry.example.com/my-org/{{|registry.example.com/my-org/{{> $PREFECT_IMAGE_NAME }}"
    tags:  # Note: changed from 'tag' to 'tags'
      - latest
      - "{{ $GIT_COMMIT_SHA }}"  # Example using git commit SHA
      - "v1.0.0"
    dockerfile: auto
    platform: "linux/amd64"

push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"
    tags: "{{ build-image.tags }}"  # Note: changed from 'tag' to 'tags'
You can use a combination of: - Static tags - Environment variables - Template variables - Git-based tags Some examples of tag combinations:
Copy code
yaml
tags:
  - latest
  - "{{ $VERSION }}"
  - "{{ $GIT_COMMIT_SHA }}"
  - "{{ $GIT_BRANCH }}"
  - "{{ $BUILD_DATE }}"
Or with specific values:
Copy code
yaml
tags:
  - latest
  - v1.0.0
  - dev
  - "2024-01"
The image will be built and pushed with all specified tags. In your work pool configuration, you'll typically want to reference a specific tag:
Copy code
yaml
definitions:
  work_pool: &common_work_pool
    name: "kubernetes"
    job_variables:
      image: "{{ build-image.image_name }}:latest"  # Specify which tag to use
This allows you to maintain multiple tagged versions of your image while controlling which version is actually used in your deployments.
r
@Marvin - Setting my resource requests as you specified dosent seem to translate into updated resources on the actual run of the deployment - looking at the job variables the resources seem to be correctly specified, but when i run the pod, it still default to the resources set in the work pool configuration. Job variables in prefect ui
Copy code
{
  "image": "***",
  "resource_limits": {
    "cpu": "500m",
    "memory": "1024Mi"
  },
  "resource_requests": {
    "cpu": "200m",
    "memory": "512Mi"
  }
}
Yaml file: # Generic metadata about this project name: flows prefect-version: 3.0.0 # build section allows you to manage and build docker images build: - prefect_docker.deployments.steps.build_docker_image: id: build-image requires: prefect-docker>=0.4.0 image_name: "europe-west3-docker.pkg.dev/infrastructure-464107/infrastructure-464107-docker-dev/hello" tag: latest dockerfile: auto platform: "linux/amd64" # push section allows you to manage if and how this project is uploaded to remote locations push: - prefect_docker.deployments.steps.push_docker_image: requires: prefect-docker>=0.4.0 image_name: "{{ build-image.image_name }}" tag: "{{ build-image.tag }}" # pull section allows you to provide instructions for cloning this project in remote locations pull: - prefect.deployments.steps.set_working_directory: directory: /opt/prefect/Desktop # the definitions section allows you to define reusable components for your deployments definitions: tags: &common_tags - "test-tag" work_pool: &common_work_pool name: "test-work-pool" job_variables: image: "{{ build-image.image }}" # the deployments section allows you to provide configuration for deploying flows deployments: - name: "rasmus" tags: - *common_tags - "another-tag" schedule: null entrypoint: "flows/hello.py:hello" parameters: name: "Rasmus" work_pool: *common_work_pool - name: "arthur" tags: - *common_tags - "High Cpu" schedule: null entrypoint: "flows/hello.py:hello" parameters: name: "Arthur" work_pool: <<: *common_work_pool job_variables: image: "{{ build-image.image }}" resource_requests: cpu: "200m" memory: "512Mi" resource_limits: cpu: "500m" memory: "1024Mi"
m
thought for 10.6 seconds
Let me help you fix this issue. The problem is in how the job variables are being merged. When using Kubernetes work pools, you need to ensure you're not just inheriting the common work pool configuration but properly overriding the job variables. Here's the corrected version of your YAML:
Copy code
yaml
definitions:
  tags: &common_tags
    - "test-tag"
  work_pool: &common_work_pool
    name: "test-work-pool"
    job_variables:
      image: "{{ build-image.image }}"

deployments:
- name: "rasmus"
  tags:
    - *common_tags
    - "another-tag"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Rasmus"
  work_pool: 
    <<: *common_work_pool
    job_variables:
      image: "{{ build-image.image }}"  # Need to repeat this

- name: "arthur"
  tags:
    - *common_tags
    - "High Cpu"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Arthur"
  work_pool:
    name: "test-work-pool"  # Specify the work pool name explicitly
    job_variables:  # Define all job_variables without inheritance
      image: "{{ build-image.image }}"
      resource_requests:
        cpu: "200m"
        memory: "512Mi"
      resource_limits:
        cpu: "500m"
        memory: "1024Mi"
The key changes are: 1. For the "arthur" deployment, we're not using the
<<: *common_work_pool
merge syntax 2. Instead, we're explicitly setting all job_variables without inheritance 3. We still need to specify the work pool name explicitly This approach ensures that the job variables aren't being merged in a way that might cause the resource specifications to be overridden by default values. Also, make sure that: 1. Your Kubernetes work pool configuration doesn't have any default resource settings that might be overriding these values 2. Your Kubernetes cluster doesn't have any LimitRanges or ResourceQuotas that might be modifying the pod specifications You can verify the actual pod specifications by checking the Kubernetes pod after it's created:
Copy code
bash
kubectl get pod <pod-name> -o yaml
Look for the
resources
section under the container specification to confirm the values are being applied correctly.
r
@Marvin - My workpool does have specific configurations - I would like these as defaults. Is there no way to have both defaults, and specific for just some deployments?
m
thought for 11.9 seconds
Yes, you can have both default configurations in your work pool and override them for specific deployments! The key is to properly structure your YAML to merge and override only the specific values you want to change. Here's how:
Copy code
yaml
definitions:
  tags: &common_tags
    - "test-tag"
  work_pool: &common_work_pool
    name: "test-work-pool"
    job_variables: &common_job_variables
      image: "{{ build-image.image }}"
      # Your default resource settings from work pool will be here
      # Example of what might be in your work pool:
      # resource_requests:
      #   cpu: "100m"
      #   memory: "256Mi"
      # resource_limits:
      #   cpu: "200m"
      #   memory: "512Mi"

deployments:
- name: "rasmus"
  tags:
    - *common_tags
    - "another-tag"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Rasmus"
  work_pool: *common_work_pool  # Uses default resources from work pool

- name: "arthur"
  tags:
    - *common_tags
    - "High Cpu"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Arthur"
  work_pool:
    <<: *common_work_pool
    job_variables:
      <<: *common_job_variables  # Merge with default job variables
      resource_requests:  # Override only the resources
        cpu: "200m"
        memory: "512Mi"
      resource_limits:
        cpu: "500m"
        memory: "1024Mi"
Key points: 1. The
rasmus
deployment uses the work pool defaults by referencing
*common_work_pool
2. The
arthur
deployment: - Merges the common work pool config with
<<: *common_work_pool
- Merges the common job variables with
<<: *common_job_variables
- Then overrides just the resource specifications This way: - Deployments without specific resource requirements use the work pool defaults - Deployments that need different resources can override just those values - All other work pool configurations remain intact You can verify the configuration is working by: 1. Checking the job variables in the Prefect UI for both deployments 2. Inspecting the actual Kubernetes pods when they run:
Copy code
bash
kubectl get pod <pod-name> -o yaml | grep -A 10 resources
The
rasmus
deployment should show the work pool defaults, while the
arthur
deployment should show the overridden values.
r
@Marvin - It still defaults to my work pool configuration
m
thought for 50.5 seconds
I understand the issue now. For Kubernetes work pools, you need to modify the base job template to override the default resources. Here's how to do it: 1. First, modify your work pool configuration to expose the resource settings as variables. In the Prefect UI: - Go to your work pool - Click "Edit" - Under "Job Configuration", add these variables to the template:
Copy code
json
{
    "job_configuration": {
        "resource_requests_cpu": {"default": "100m"},
        "resource_requests_memory": {"default": "256Mi"},
        "resource_limits_cpu": {"default": "200m"},
        "resource_limits_memory": {"default": "512Mi"}
    }
}
2. Then in your
prefect.yaml
, you can override these variables:
Copy code
yaml
definitions:
  tags: &common_tags
    - "test-tag"
  work_pool: &common_work_pool
    name: "test-work-pool"
    job_variables:
      image: "{{ build-image.image }}"

deployments:
- name: "rasmus"
  tags:
    - *common_tags
    - "another-tag"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Rasmus"
  work_pool: *common_work_pool  # Uses default resources

- name: "arthur"
  tags:
    - *common_tags
    - "High Cpu"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Arthur"
  work_pool:
    <<: *common_work_pool
    job_variables:
      image: "{{ build-image.image }}"
      resource_requests_cpu: "200m"
      resource_requests_memory: "512Mi"
      resource_limits_cpu: "500m"
      resource_limits_memory: "1024Mi"
The key differences in this approach: 1. Resources are defined as individual variables rather than nested structures 2. These variables are then used in the work pool's job template to set the actual Kubernetes resources 3. The work pool template needs to be configured to use these variables This approach should properly override the default work pool settings for specific deployments while maintaining defaults for others. If you're still having issues, you might need to check: 1. The work pool's job template configuration to ensure it's properly using these variables 2. Any Kubernetes cluster-level constraints (LimitRanges or ResourceQuotas) 3. The actual pod specification when it's created to see where the resources are being set Would you like me to provide more specific guidance on configuring the work pool's job template?
r
@Marvin - I have exposed those settings in my work pool - my total work pool config looks like this:
Copy code
{
  "variables": {
    "type": "object",
    "properties": {
      "env": {
        "type": "object",
        "title": "Environment Variables",
        "description": "Environment variables to set when starting a flow run.",
        "additionalProperties": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ]
        }
      },
      "name": {
        "anyOf": [
          {
            "type": "string"
          },
          {
            "type": "null"
          }
        ],
        "title": "Name",
        "default": "prefect-worker",
        "description": "Name given to infrastructure created by a worker."
      },
      "image": {
        "anyOf": [
          {
            "type": "string"
          },
          {
            "type": "null"
          }
        ],
        "title": "Image",
        "default": "prefecthq/prefect:3-latest",
        "examples": [
          "<http://docker.io/prefecthq/prefect:3-latest|docker.io/prefecthq/prefect:3-latest>"
        ],
        "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used."
      },
      "labels": {
        "type": "object",
        "title": "Labels",
        "description": "Labels applied to infrastructure created by a worker.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "command": {
        "anyOf": [
          {
            "type": "string"
          },
          {
            "type": "null"
          }
        ],
        "title": "Command",
        "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker."
      },
      "namespace": {
        "type": "string",
        "title": "Namespace",
        "default": "prefect",
        "description": "The Kubernetes namespace to create jobs within."
      },
      "cpu_request": {
        "type": "string",
        "title": "CPU Request",
        "default": "100m",
        "description": "The CPU allocation to request for this pod."
      },
      "backoff_limit": {
        "type": "integer",
        "title": "Backoff Limit",
        "default": 0,
        "minimum": 0,
        "description": "The number of times Kubernetes will retry a job after pod eviction. If set to 0, Prefect will reschedule the flow run when the pod is evicted."
      },
      "stream_output": {
        "type": "boolean",
        "title": "Stream Output",
        "default": true,
        "description": "If set, output will be streamed from the job to local standard output."
      },
      "cluster_config": {
        "anyOf": [
          {
            "$ref": "#/definitions/KubernetesClusterConfig"
          },
          {
            "type": "null"
          }
        ],
        "description": "The Kubernetes cluster config to use for job creation."
      },
      "memory_request": {
        "type": "string",
        "title": "Memory Request",
        "default": "100Mi",
        "description": "The Memory allocation to request for this pod."
      },
      "finished_job_ttl": {
        "anyOf": [
          {
            "type": "integer"
          },
          {
            "type": "null"
          }
        ],
        "title": "Finished Job TTL",
        "default": 60,
        "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
      },
      "image_pull_policy": {
        "enum": [
          "IfNotPresent",
          "Always",
          "Never"
        ],
        "type": "string",
        "title": "Image Pull Policy",
        "default": "Always",
        "description": "The Kubernetes image pull policy to use for job containers."
      },
      "service_account_name": {
        "anyOf": [
          {
            "type": "string"
          },
          {
            "type": "null"
          }
        ],
        "title": "Service Account Name",
        "description": "The Kubernetes service account to use for job creation."
      },
      "job_watch_timeout_seconds": {
        "anyOf": [
          {
            "type": "integer"
          },
          {
            "type": "null"
          }
        ],
        "title": "Job Watch Timeout Seconds",
        "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
      },
      "pod_watch_timeout_seconds": {
        "type": "integer",
        "title": "Pod Watch Timeout Seconds",
        "default": 300,
        "description": "Number of seconds to watch for pod creation before timing out."
      }
    },
    "definitions": {
      "KubernetesClusterConfig": {
        "type": "object",
        "title": "KubernetesClusterConfig",
        "required": [
          "config",
          "context_name"
        ],
        "properties": {
          "config": {
            "type": "object",
            "title": "Config",
            "description": "The entire contents of a kubectl config file.",
            "additionalProperties": true
          },
          "context_name": {
            "type": "string",
            "title": "Context Name",
            "description": "The name of the kubectl context to use."
          }
        },
        "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
        "secret_fields": [],
        "block_type_slug": "kubernetes-cluster-config",
        "block_schema_references": {}
      }
    },
    "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
  },
  "job_configuration": {
    "env": "{{ env }}",
    "name": "{{ name }}",
    "labels": "{{ labels }}",
    "command": "{{ command }}",
    "namespace": "{{ namespace }}",
    "job_manifest": {
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "env": "{{ env }}",
                "args": "{{ command }}",
                "name": "prefect-job",
                "image": "{{ image }}",
                "resources": {
                  "requests": {
                    "cpu": "{{ cpu_request }}",
                    "memory": "{{ memory_request }}"
                  }
                },
                "imagePullPolicy": "{{ image_pull_policy }}"
              }
            ],
            "completions": 1,
            "parallelism": 1,
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          }
        },
        "backoffLimit": "{{ backoff_limit }}",
        "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
      },
      "metadata": {
        "labels": "{{ labels }}",
        "namespace": "{{ namespace }}",
        "generateName": "{{ name }}-"
      },
      "apiVersion": "batch/v1"
    },
    "stream_output": "{{ stream_output }}",
    "cluster_config": "{{ cluster_config }}",
    "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
    "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
  }
}
m
Your message was too long, here's your message at the allowed limit: - I have exposed those settings in my work pool - my total work pool config looks like this ```{ variables { type object properties { env { type object title Environment Variables description Environment variables to set when starting a flow run additionalProperties { anyOf [ { type string } { type null } ] } } name { anyOf [ { type string } { type null } ] title Name default prefect-worker description Name given to infrastructure created by a worker } image { anyOf [ { type string } { type null } ] title Image default prefecthq/prefect 3-latest examples [ <http //docker io/prefecthq/prefect 3-latest|docker io/prefecthq/prefect 3-latest> ] description The image reference of a container image to use for created jobs If not set the latest Prefect image will be used } labels { type object title Labels description Labels applied to infrastructure created by a worker additionalProperties { type string } } command { anyOf [ { type string } { type null } ] title Command description The command to use when starting a flow run In most cases this should be left blank and the command will be automatically generated by the worker } namespace { type string title Namespace default prefect description The Kubernetes namespace to create jobs within } cpu_request { type string title CPU Request default 100m description The CPU allocation to request for this pod } backoff_limit { type integer title Backoff Limit default 0 minimum 0 description The number of times Kubernetes will retry a job after pod eviction If set to 0 Prefect will reschedule the flow run when the pod is evicted } stream_output { type boolean title Stream Output default true description If set output will be streamed from the job to local standard output } cluster_config { anyOf [ { $ref #/definitions/KubernetesClusterConfig } { type null } ] description The Kubernetes cluster config to use for job creation } memory_request { type string title Memory Request default 100Mi description The Memory allocation to request for this pod } finished_job_ttl { anyOf [ { type integer } { type null } ] title Finished Job TTL default 60 description The number of seconds to retain jobs after completion If set finished jobs will be cleaned up by Kubernetes after the given delay If not set jobs will be retained indefinitely } image_pull_policy { enum [ IfNotPresent Always Never ] type string title Image Pull Policy default Always description The Kubernetes image pull policy to use for job containers } service_account_name { anyOf [ { type string } { type null } ] title Service Account Name description The Kubernetes service account to use for job creation } job_watch_timeout_seconds { anyOf [ { type integer } { type null } ] title Job Watch Timeout Seconds description Number of seconds to wait for each event emitted by a job before timing out If not set the worker will wait for each event indefinitely } pod_watch_timeout_seconds {
r
@Marvin I have updated my work pool configuration, and it looks like this
Copy code
{
  "variables": {
    "type": "object",
    "properties": {
      ...,      
"cpu_request": {
        "type": "string",
        "title": "CPU Request",
        "default": "100m",
        "description": "The CPU allocation to request for this pod."
      },
      "backoff_limit": {
        "type": "integer",
        "title": "Backoff Limit",
        "default": 0,
        "minimum": 0,
        "description": "The number of times Kubernetes will retry a job after pod eviction. If set to 0, Prefect will reschedule the flow run when the pod is evicted."
      },
      "stream_output": {
        "type": "boolean",
        "title": "Stream Output",
        "default": true,
        "description": "If set, output will be streamed from the job to local standard output."
      },
      "cluster_config": {
        "anyOf": [
          {
            "$ref": "#/definitions/KubernetesClusterConfig"
          },
          {
            "type": "null"
          }
        ],
        "description": "The Kubernetes cluster config to use for job creation."
      },
      "memory_request": {
        "type": "string",
        "title": "Memory Request",
        "default": "100Mi",
        "description": "The Memory allocation to request for this pod."
      },
      "finished_job_ttl": {
        "anyOf": [
          {
            "type": "integer"
          },
          {
            "type": "null"
          }
        ],
        "title": "Finished Job TTL",
        "default": 60,
        "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
      },
      "image_pull_policy": {
        "enum": [
          "IfNotPresent",
          "Always",
          "Never"
        ],
        "type": "string",
        "title": "Image Pull Policy",
        "default": "Always",
        "description": "The Kubernetes image pull policy to use for job containers."
      },
      "service_account_name": {
        "anyOf": [
          {
            "type": "string"
          },
          {
            "type": "null"
          }
        ],
        "title": "Service Account Name",
        "description": "The Kubernetes service account to use for job creation."
      },
      "job_watch_timeout_seconds": {
        "anyOf": [
          {
            "type": "integer"
          },
          {
            "type": "null"
          }
        ],
        "title": "Job Watch Timeout Seconds",
        "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
      },
      "pod_watch_timeout_seconds": {
        "type": "integer",
        "title": "Pod Watch Timeout Seconds",
        "default": 300,
        "description": "Number of seconds to watch for pod creation before timing out."
      }
    },
    "definitions": {
      "KubernetesClusterConfig": {
        "type": "object",
        "title": "KubernetesClusterConfig",
        "required": [
          "config",
          "context_name"
        ],
        "properties": {
          "config": {
            "type": "object",
            "title": "Config",
            "description": "The entire contents of a kubectl config file.",
            "additionalProperties": true
          },
          "context_name": {
            "type": "string",
            "title": "Context Name",
            "description": "The name of the kubectl context to use."
          }
        },
        "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
        "secret_fields": [],
        "block_type_slug": "kubernetes-cluster-config",
        "block_schema_references": {}
      }
    },
    "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
  },
  "job_configuration": {
    "env": "{{ env }}",
    "name": "{{ name }}",
    "labels": "{{ labels }}",
    "command": "{{ command }}",
    "namespace": "{{ namespace }}",
    "job_manifest": {
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "env": "{{ env }}",
                "args": "{{ command }}",
                "name": "prefect-job",
                "image": "{{ image }}",
                "resources": {
                  "requests": {
                    "cpu": "{{ cpu_request }}",
                    "memory": "{{ memory_request }}"
                  }
                },
                "imagePullPolicy": "{{ image_pull_policy }}"
              }
            ], ...
  }
}
m
thought for 13.4 seconds
Now I see the complete picture. With this work pool configuration, you can override the CPU and memory requests in your
prefect.yaml
using the variables
cpu_request
and
memory_request
. Here's how to modify your deployment:
Copy code
yaml
definitions:
  tags: &common_tags
    - "test-tag"
  work_pool: &common_work_pool
    name: "test-work-pool"
    job_variables:
      image: "{{ build-image.image }}"

deployments:
- name: "rasmus"
  tags:
    - *common_tags
    - "another-tag"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Rasmus"
  work_pool: *common_work_pool  # Uses default resources (100m CPU, 100Mi memory)

- name: "arthur"
  tags:
    - *common_tags
    - "High Cpu"
  schedule: null
  entrypoint: "flows/hello.py:hello"
  parameters:
    name: "Arthur"
  work_pool:
    <<: *common_work_pool
    job_variables:
      image: "{{ build-image.image }}"
      cpu_request: "500m"      # Override CPU request
      memory_request: "512Mi"  # Override memory request
A few important notes: 1. The variable names must match exactly what's in your work pool configuration (
cpu_request
and
memory_request
) 2. I notice your work pool configuration currently only has resource requests but no limits. If you need limits as well, you'll need to add them to your work pool configuration as variables (e.g.,
cpu_limit
and
memory_limit
) and update the job manifest template accordingly 3. The default values (100m CPU, 100Mi memory) will be used for any deployment that doesn't override these values If you also want to add resource limits, you would need to: 1. Add these variables to your work pool configuration:
Copy code
json
"cpu_limit": {
    "type": "string",
    "title": "CPU Limit",
    "default": "200m",
    "description": "The CPU allocation limit for this pod."
},
"memory_limit": {
    "type": "string",
    "title": "Memory Limit",
    "default": "200Mi",
    "description": "The Memory allocation limit for this pod."
}
2. Update the job manifest in your work pool configuration to include limits:
Copy code
json
"resources": {
    "requests": {
        "cpu": "{{ cpu_request }}",
        "memory": "{{ memory_request }}"
    },
    "limits": {
        "cpu": "{{ cpu_limit }}",
        "memory": "{{ memory_limit }}"
    }
}
3. Then you could specify limits in your deployment:
Copy code
yaml
work_pool:
    <<: *common_work_pool
    job_variables:
      image: "{{ build-image.image }}"
      cpu_request: "500m"
      memory_request: "512Mi"
      cpu_limit: "1000m"
      memory_limit: "1Gi"
r
@Marvin - What if i needed to specify a datetime object in the yaml file as parameter rather than just a string? How would that work?
m
thought for 12.5 seconds
In a Prefect YAML file, you can specify datetime parameters in several ways. Here are the recommended approaches:
Copy code
yaml
deployments:
- name: "datetime-example"
  entrypoint: "flows/example.py:flow"
  parameters:
    # Option 1: ISO 8601 format string (recommended)
    start_date: "2024-01-15T10:30:00Z"
    
    # Option 2: Using YAML's built-in timestamp type
    end_date: !!timestamp 2024-01-15T10:30:00Z
    
    # Option 3: Date only
    just_date: "2024-01-15"
    
    # Option 4: With timezone specification
    timezone_date: "2024-01-15T10:30:00+01:00"
When Prefect processes these parameters, it will automatically convert the string representations to Python datetime objects if: 1. The parameter is type-hinted as
datetime
in your flow function 2. The string is in a recognizable datetime format Here's an example of how your flow code should look to properly receive these parameters:
Copy code
python
from datetime import datetime
from prefect import flow

@flow
def example_flow(
    start_date: datetime,
    end_date: datetime,
    just_date: datetime,
    timezone_date: datetime
):
    print(f"Start date: {start_date} - Type: {type(start_date)}")
    print(f"End date: {end_date} - Type: {type(end_date)}")
    print(f"Just date: {just_date} - Type: {type(just_date)}")
    print(f"Timezone date: {timezone_date} - Type: {type(timezone_date)}")
Key points: - Always use quotes around datetime strings in YAML to avoid parsing issues - The ISO 8601 format (
YYYY-MM-DDTHH:MM:SSZ
) is the most reliable - Include the 'Z' suffix for UTC or proper timezone offset (+01:00) when timezone matters - Type hints in your flow function are important for proper parameter conversion If you need to work with specific timezones, you can also use
datetime
with
pytz
or `zoneinfo`:
Copy code
python
from datetime import datetime
import pytz
from prefect import flow

@flow
def timezone_flow(event_time: datetime):
    # Convert to specific timezone if needed
    tz = pytz.timezone('Europe/Copenhagen')
    local_time = event_time.astimezone(tz)
    print(f"Event time in Copenhagen: {local_time}")
Copy code
yaml
deployments:
- name: "timezone-example"
  entrypoint: "flows/example.py:timezone_flow"
  parameters:
    event_time: "2024-01-15T10:30:00+00:00"
r
@Marvin Is it possible to parse a python object via yaml? For instance a data config class ?
m
thought for 18.9 seconds
Yes, you can work with Python objects in YAML, but it requires some additional setup since YAML doesn't natively understand Python objects. Here are a few approaches: 1. Using Pydantic Models (Recommended):
Copy code
python
# config.py
from pydantic import BaseModel
from datetime import datetime
from typing import List, Optional

class DataConfig(BaseModel):
    name: str
    start_date: datetime
    batch_size: int
    features: List[str]
    optional_param: Optional[float] = None

# flow.py
from prefect import flow
from .config import DataConfig

@flow
def process_data(config: DataConfig):
    print(f"Processing {config.name} with batch size {config.batch_size}")
    print(f"Features: {config.features}")
Copy code
yaml
deployments:
- name: "data-processor"
  entrypoint: "flows/flow.py:process_data"
  parameters:
    config:
      name: "my_dataset"
      start_date: "2024-01-15T10:30:00Z"
      batch_size: 1000
      features: ["feature1", "feature2", "feature3"]
      optional_param: 0.5
2. Using dataclasses with custom deserializer:
Copy code
python
# config.py
from dataclasses import dataclass
from datetime import datetime
from typing import List
import yaml

@dataclass
class DataConfig:
    name: str
    start_date: datetime
    batch_size: int
    features: List[str]

    @classmethod
    def from_dict(cls, data: dict):
        # Convert string to datetime
        if isinstance(data['start_date'], str):
            data['start_date'] = datetime.fromisoformat(data['start_date'].replace('Z', '+00:00'))
        return cls(**data)

# flow.py
from prefect import flow
from .config import DataConfig

@flow
def process_data(config_dict: dict):
    # Convert dict to DataConfig object
    config = DataConfig.from_dict(config_dict)
    print(f"Processing {config.name} with batch size {config.batch_size}")
Copy code
yaml
deployments:
- name: "data-processor"
  entrypoint: "flows/flow.py:process_data"
  parameters:
    config_dict:
      name: "my_dataset"
      start_date: "2024-01-15T10:30:00Z"
      batch_size: 1000
      features: ["feature1", "feature2", "feature3"]
3. Using a configuration file approach:
Copy code
python
# config.py
from dataclasses import dataclass
from datetime import datetime
from typing import List
import yaml

@dataclass
class DataConfig:
    name: str
    start_date: datetime
    batch_size: int
    features: List[str]

    @classmethod
    def from_yaml(cls, path: str):
        with open(path) as f:
            data = yaml.safe_load(f)
            if isinstance(data['start_date'], str):
                data['start_date'] = datetime.fromisoformat(
                    data['start_date'].replace('Z', '+00:00')
                )
            return cls(**data)

# flow.py
from prefect import flow
from .config import DataConfig

@flow
def process_data(config_path: str):
    config = DataConfig.from_yaml(config_path)
    print(f"Processing {config.name} with batch size {config.batch_size}")
Copy code
yaml
# config.yaml
name: "my_dataset"
start_date: "2024-01-15T10:30:00Z"
batch_size: 1000
features:
  - "feature1"
  - "feature2"
  - "feature3"

# prefect.yaml
deployments:
- name: "data-processor"
  entrypoint: "flows/flow.py:process_data"
  parameters:
    config_path: "path/to/config.yaml"
Recommendations: 1. Use Pydantic (Best Option): - Built-in validation - Automatic type conversion - Great IDE support - Clear error messages - Native JSON/dict serialization/deserialization 2. For Simpler Cases: - Use dataclasses with custom deserializers - Good for when you don't need extensive validation 3. For Complex Configurations: - Use separate configuration files - Helps manage multiple deployment configurations - Easier to version control - Can be shared across different deployments Important Notes: - Always validate your data after deserialization - Handle timezone information explicitly for datetime objects - Consider using environment variables for sensitive information
- Use type hints to help with IDE support and documentation - Consider adding validation methods to your config classes The Pydantic approach is generally recommended because it handles most edge cases and provides good validation out of the box, but any of these approaches can work depending on your specific needs.