Rasmus
07/14/2025, 4:56 AMMarvin
07/14/2025, 4:56 AMMarvin
07/14/2025, 4:56 AMjob_variables
section. Here's how to modify your configuration:
yaml
definitions:
tags: &common_tags
- "eks"
work_pool: &common_work_pool
name: "kubernetes"
job_variables:
image: "{{ build-image.image }}"
deployments:
- name: "default"
tags: *common_tags
schedule: null
entrypoint: "flows/hello.py:hello"
work_pool:
<<: *common_work_pool
job_variables:
image: "{{ build-image.image }}"
resource_requests:
cpu: "500m"
memory: "512Mi"
resource_limits:
cpu: "1000m"
memory: "1Gi"
- name: "arthur"
tags: *common_tags
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Arthur"
work_pool:
<<: *common_work_pool
job_variables:
image: "{{ build-image.image }}"
resource_requests:
cpu: "1000m"
memory: "1Gi"
resource_limits:
cpu: "2000m"
memory: "2Gi"
A few key points:
- Use <<: *common_work_pool
to merge the common work pool configuration
- Override job_variables
at the deployment level
- Specify both resource_requests
and `resource_limits`:
- requests
is what the pod is guaranteed to get
- limits
is the maximum the pod can use
- CPU is specified in millicores (e.g., "500m" = 0.5 CPU cores)
- Memory can use suffixes like Mi (mebibytes) or Gi (gibibytes)
This configuration allows you to have different resource allocations for each deployment while maintaining the common work pool configuration for other settings.Rasmus
07/14/2025, 4:59 AMMarvin
07/14/2025, 4:59 AMMarvin
07/14/2025, 4:59 AMimage_name
in your prefect.yaml
. Here's how to modify the configuration:
yaml
# build section allows you to manage and build docker images
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.4.0
image_name: "<http://registry.example.com/my-org/{{|registry.example.com/my-org/{{> $PREFECT_IMAGE_NAME }}" # Include registry here
tag: latest
dockerfile: auto
platform: "linux/amd64"
# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build-image.image_name }}" # This will include the registry
tag: "{{ build-image.tag }}"
definitions:
tags: &common_tags
- "eks"
work_pool: &common_work_pool
name: "kubernetes"
job_variables:
image: "{{ build-image.image }}" # This will reference the full image path including registry
You have several options for specifying the registry:
1. Directly in the yaml file:
yaml
image_name: "<http://registry.example.com/my-org/{{|registry.example.com/my-org/{{> $PREFECT_IMAGE_NAME }}"
2. Using environment variables:
yaml
image_name: "{{ $DOCKER_REGISTRY }}/{{ $PREFECT_IMAGE_NAME }}"
3. For private registries that require authentication, you'll need to ensure you're logged in to the registry before running prefect deploy
. You can do this with:
bash
docker login <http://registry.example.com|registry.example.com>
Common registry formats:
- Docker Hub: <http://docker.io/username/image-name|docker.io/username/image-name>
- Google Container Registry (GCR): <http://gcr.io/project-id/image-name|gcr.io/project-id/image-name>
- Amazon Elastic Container Registry (ECR): <http://account-id.dkr.ecr.region.amazonaws.com/image-name|account-id.dkr.ecr.region.amazonaws.com/image-name>
- Azure Container Registry (ACR): <http://registryname.azurecr.io/image-name|registryname.azurecr.io/image-name>
The image reference will flow through from the build step to the push step and finally to your work pool configuration, ensuring consistency throughout the deployment process.Rasmus
07/14/2025, 5:02 AMMarvin
07/14/2025, 5:02 AMMarvin
07/14/2025, 5:02 AMprefect.yaml
file by modifying the build and push sections. Here's how:
yaml
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.4.0
image_name: "<http://registry.example.com/my-org/{{|registry.example.com/my-org/{{> $PREFECT_IMAGE_NAME }}"
tags: # Note: changed from 'tag' to 'tags'
- latest
- "{{ $GIT_COMMIT_SHA }}" # Example using git commit SHA
- "v1.0.0"
dockerfile: auto
platform: "linux/amd64"
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build-image.image_name }}"
tags: "{{ build-image.tags }}" # Note: changed from 'tag' to 'tags'
You can use a combination of:
- Static tags
- Environment variables
- Template variables
- Git-based tags
Some examples of tag combinations:
yaml
tags:
- latest
- "{{ $VERSION }}"
- "{{ $GIT_COMMIT_SHA }}"
- "{{ $GIT_BRANCH }}"
- "{{ $BUILD_DATE }}"
Or with specific values:
yaml
tags:
- latest
- v1.0.0
- dev
- "2024-01"
The image will be built and pushed with all specified tags. In your work pool configuration, you'll typically want to reference a specific tag:
yaml
definitions:
work_pool: &common_work_pool
name: "kubernetes"
job_variables:
image: "{{ build-image.image_name }}:latest" # Specify which tag to use
This allows you to maintain multiple tagged versions of your image while controlling which version is actually used in your deployments.Rasmus
07/14/2025, 5:25 AM{
"image": "***",
"resource_limits": {
"cpu": "500m",
"memory": "1024Mi"
},
"resource_requests": {
"cpu": "200m",
"memory": "512Mi"
}
}
Yaml file:
# Generic metadata about this project
name: flows
prefect-version: 3.0.0
# build section allows you to manage and build docker images
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.4.0
image_name: "europe-west3-docker.pkg.dev/infrastructure-464107/infrastructure-464107-docker-dev/hello"
tag: latest
dockerfile: auto
platform: "linux/amd64"
# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build-image.image_name }}"
tag: "{{ build-image.tag }}"
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/Desktop
# the definitions section allows you to define reusable components for your deployments
definitions:
tags: &common_tags
- "test-tag"
work_pool: &common_work_pool
name: "test-work-pool"
job_variables:
image: "{{ build-image.image }}"
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: "rasmus"
tags:
- *common_tags
- "another-tag"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Rasmus"
work_pool: *common_work_pool
- name: "arthur"
tags:
- *common_tags
- "High Cpu"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Arthur"
work_pool:
<<: *common_work_pool
job_variables:
image: "{{ build-image.image }}"
resource_requests:
cpu: "200m"
memory: "512Mi"
resource_limits:
cpu: "500m"
memory: "1024Mi"Marvin
07/14/2025, 5:26 AMMarvin
07/14/2025, 5:26 AMyaml
definitions:
tags: &common_tags
- "test-tag"
work_pool: &common_work_pool
name: "test-work-pool"
job_variables:
image: "{{ build-image.image }}"
deployments:
- name: "rasmus"
tags:
- *common_tags
- "another-tag"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Rasmus"
work_pool:
<<: *common_work_pool
job_variables:
image: "{{ build-image.image }}" # Need to repeat this
- name: "arthur"
tags:
- *common_tags
- "High Cpu"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Arthur"
work_pool:
name: "test-work-pool" # Specify the work pool name explicitly
job_variables: # Define all job_variables without inheritance
image: "{{ build-image.image }}"
resource_requests:
cpu: "200m"
memory: "512Mi"
resource_limits:
cpu: "500m"
memory: "1024Mi"
The key changes are:
1. For the "arthur" deployment, we're not using the <<: *common_work_pool
merge syntax
2. Instead, we're explicitly setting all job_variables without inheritance
3. We still need to specify the work pool name explicitly
This approach ensures that the job variables aren't being merged in a way that might cause the resource specifications to be overridden by default values.
Also, make sure that:
1. Your Kubernetes work pool configuration doesn't have any default resource settings that might be overriding these values
2. Your Kubernetes cluster doesn't have any LimitRanges or ResourceQuotas that might be modifying the pod specifications
You can verify the actual pod specifications by checking the Kubernetes pod after it's created:
bash
kubectl get pod <pod-name> -o yaml
Look for the resources
section under the container specification to confirm the values are being applied correctly.Rasmus
07/14/2025, 5:27 AMMarvin
07/14/2025, 5:27 AMMarvin
07/14/2025, 5:28 AMyaml
definitions:
tags: &common_tags
- "test-tag"
work_pool: &common_work_pool
name: "test-work-pool"
job_variables: &common_job_variables
image: "{{ build-image.image }}"
# Your default resource settings from work pool will be here
# Example of what might be in your work pool:
# resource_requests:
# cpu: "100m"
# memory: "256Mi"
# resource_limits:
# cpu: "200m"
# memory: "512Mi"
deployments:
- name: "rasmus"
tags:
- *common_tags
- "another-tag"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Rasmus"
work_pool: *common_work_pool # Uses default resources from work pool
- name: "arthur"
tags:
- *common_tags
- "High Cpu"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Arthur"
work_pool:
<<: *common_work_pool
job_variables:
<<: *common_job_variables # Merge with default job variables
resource_requests: # Override only the resources
cpu: "200m"
memory: "512Mi"
resource_limits:
cpu: "500m"
memory: "1024Mi"
Key points:
1. The rasmus
deployment uses the work pool defaults by referencing *common_work_pool
2. The arthur
deployment:
- Merges the common work pool config with <<: *common_work_pool
- Merges the common job variables with <<: *common_job_variables
- Then overrides just the resource specifications
This way:
- Deployments without specific resource requirements use the work pool defaults
- Deployments that need different resources can override just those values
- All other work pool configurations remain intact
You can verify the configuration is working by:
1. Checking the job variables in the Prefect UI for both deployments
2. Inspecting the actual Kubernetes pods when they run:
bash
kubectl get pod <pod-name> -o yaml | grep -A 10 resources
The rasmus
deployment should show the work pool defaults, while the arthur
deployment should show the overridden values.Rasmus
07/14/2025, 5:33 AMMarvin
07/14/2025, 5:33 AMMarvin
07/14/2025, 5:34 AMjson
{
"job_configuration": {
"resource_requests_cpu": {"default": "100m"},
"resource_requests_memory": {"default": "256Mi"},
"resource_limits_cpu": {"default": "200m"},
"resource_limits_memory": {"default": "512Mi"}
}
}
2. Then in your prefect.yaml
, you can override these variables:
yaml
definitions:
tags: &common_tags
- "test-tag"
work_pool: &common_work_pool
name: "test-work-pool"
job_variables:
image: "{{ build-image.image }}"
deployments:
- name: "rasmus"
tags:
- *common_tags
- "another-tag"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Rasmus"
work_pool: *common_work_pool # Uses default resources
- name: "arthur"
tags:
- *common_tags
- "High Cpu"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Arthur"
work_pool:
<<: *common_work_pool
job_variables:
image: "{{ build-image.image }}"
resource_requests_cpu: "200m"
resource_requests_memory: "512Mi"
resource_limits_cpu: "500m"
resource_limits_memory: "1024Mi"
The key differences in this approach:
1. Resources are defined as individual variables rather than nested structures
2. These variables are then used in the work pool's job template to set the actual Kubernetes resources
3. The work pool template needs to be configured to use these variables
This approach should properly override the default work pool settings for specific deployments while maintaining defaults for others.
If you're still having issues, you might need to check:
1. The work pool's job template configuration to ensure it's properly using these variables
2. Any Kubernetes cluster-level constraints (LimitRanges or ResourceQuotas)
3. The actual pod specification when it's created to see where the resources are being set
Would you like me to provide more specific guidance on configuring the work pool's job template?Rasmus
07/14/2025, 5:36 AM{
"variables": {
"type": "object",
"properties": {
"env": {
"type": "object",
"title": "Environment Variables",
"description": "Environment variables to set when starting a flow run.",
"additionalProperties": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
}
},
"name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Name",
"default": "prefect-worker",
"description": "Name given to infrastructure created by a worker."
},
"image": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Image",
"default": "prefecthq/prefect:3-latest",
"examples": [
"<http://docker.io/prefecthq/prefect:3-latest|docker.io/prefecthq/prefect:3-latest>"
],
"description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used."
},
"labels": {
"type": "object",
"title": "Labels",
"description": "Labels applied to infrastructure created by a worker.",
"additionalProperties": {
"type": "string"
}
},
"command": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Command",
"description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker."
},
"namespace": {
"type": "string",
"title": "Namespace",
"default": "prefect",
"description": "The Kubernetes namespace to create jobs within."
},
"cpu_request": {
"type": "string",
"title": "CPU Request",
"default": "100m",
"description": "The CPU allocation to request for this pod."
},
"backoff_limit": {
"type": "integer",
"title": "Backoff Limit",
"default": 0,
"minimum": 0,
"description": "The number of times Kubernetes will retry a job after pod eviction. If set to 0, Prefect will reschedule the flow run when the pod is evicted."
},
"stream_output": {
"type": "boolean",
"title": "Stream Output",
"default": true,
"description": "If set, output will be streamed from the job to local standard output."
},
"cluster_config": {
"anyOf": [
{
"$ref": "#/definitions/KubernetesClusterConfig"
},
{
"type": "null"
}
],
"description": "The Kubernetes cluster config to use for job creation."
},
"memory_request": {
"type": "string",
"title": "Memory Request",
"default": "100Mi",
"description": "The Memory allocation to request for this pod."
},
"finished_job_ttl": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Finished Job TTL",
"default": 60,
"description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
},
"image_pull_policy": {
"enum": [
"IfNotPresent",
"Always",
"Never"
],
"type": "string",
"title": "Image Pull Policy",
"default": "Always",
"description": "The Kubernetes image pull policy to use for job containers."
},
"service_account_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Service Account Name",
"description": "The Kubernetes service account to use for job creation."
},
"job_watch_timeout_seconds": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Job Watch Timeout Seconds",
"description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
},
"pod_watch_timeout_seconds": {
"type": "integer",
"title": "Pod Watch Timeout Seconds",
"default": 300,
"description": "Number of seconds to watch for pod creation before timing out."
}
},
"definitions": {
"KubernetesClusterConfig": {
"type": "object",
"title": "KubernetesClusterConfig",
"required": [
"config",
"context_name"
],
"properties": {
"config": {
"type": "object",
"title": "Config",
"description": "The entire contents of a kubectl config file.",
"additionalProperties": true
},
"context_name": {
"type": "string",
"title": "Context Name",
"description": "The name of the kubectl context to use."
}
},
"description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
"secret_fields": [],
"block_type_slug": "kubernetes-cluster-config",
"block_schema_references": {}
}
},
"description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
},
"job_configuration": {
"env": "{{ env }}",
"name": "{{ name }}",
"labels": "{{ labels }}",
"command": "{{ command }}",
"namespace": "{{ namespace }}",
"job_manifest": {
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [
{
"env": "{{ env }}",
"args": "{{ command }}",
"name": "prefect-job",
"image": "{{ image }}",
"resources": {
"requests": {
"cpu": "{{ cpu_request }}",
"memory": "{{ memory_request }}"
}
},
"imagePullPolicy": "{{ image_pull_policy }}"
}
],
"completions": 1,
"parallelism": 1,
"restartPolicy": "Never",
"serviceAccountName": "{{ service_account_name }}"
}
},
"backoffLimit": "{{ backoff_limit }}",
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
},
"metadata": {
"labels": "{{ labels }}",
"namespace": "{{ namespace }}",
"generateName": "{{ name }}-"
},
"apiVersion": "batch/v1"
},
"stream_output": "{{ stream_output }}",
"cluster_config": "{{ cluster_config }}",
"job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
"pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
}
}
Marvin
07/14/2025, 5:36 AMRasmus
07/14/2025, 5:37 AM{
"variables": {
"type": "object",
"properties": {
...,
"cpu_request": {
"type": "string",
"title": "CPU Request",
"default": "100m",
"description": "The CPU allocation to request for this pod."
},
"backoff_limit": {
"type": "integer",
"title": "Backoff Limit",
"default": 0,
"minimum": 0,
"description": "The number of times Kubernetes will retry a job after pod eviction. If set to 0, Prefect will reschedule the flow run when the pod is evicted."
},
"stream_output": {
"type": "boolean",
"title": "Stream Output",
"default": true,
"description": "If set, output will be streamed from the job to local standard output."
},
"cluster_config": {
"anyOf": [
{
"$ref": "#/definitions/KubernetesClusterConfig"
},
{
"type": "null"
}
],
"description": "The Kubernetes cluster config to use for job creation."
},
"memory_request": {
"type": "string",
"title": "Memory Request",
"default": "100Mi",
"description": "The Memory allocation to request for this pod."
},
"finished_job_ttl": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Finished Job TTL",
"default": 60,
"description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
},
"image_pull_policy": {
"enum": [
"IfNotPresent",
"Always",
"Never"
],
"type": "string",
"title": "Image Pull Policy",
"default": "Always",
"description": "The Kubernetes image pull policy to use for job containers."
},
"service_account_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Service Account Name",
"description": "The Kubernetes service account to use for job creation."
},
"job_watch_timeout_seconds": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Job Watch Timeout Seconds",
"description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
},
"pod_watch_timeout_seconds": {
"type": "integer",
"title": "Pod Watch Timeout Seconds",
"default": 300,
"description": "Number of seconds to watch for pod creation before timing out."
}
},
"definitions": {
"KubernetesClusterConfig": {
"type": "object",
"title": "KubernetesClusterConfig",
"required": [
"config",
"context_name"
],
"properties": {
"config": {
"type": "object",
"title": "Config",
"description": "The entire contents of a kubectl config file.",
"additionalProperties": true
},
"context_name": {
"type": "string",
"title": "Context Name",
"description": "The name of the kubectl context to use."
}
},
"description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
"secret_fields": [],
"block_type_slug": "kubernetes-cluster-config",
"block_schema_references": {}
}
},
"description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
},
"job_configuration": {
"env": "{{ env }}",
"name": "{{ name }}",
"labels": "{{ labels }}",
"command": "{{ command }}",
"namespace": "{{ namespace }}",
"job_manifest": {
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [
{
"env": "{{ env }}",
"args": "{{ command }}",
"name": "prefect-job",
"image": "{{ image }}",
"resources": {
"requests": {
"cpu": "{{ cpu_request }}",
"memory": "{{ memory_request }}"
}
},
"imagePullPolicy": "{{ image_pull_policy }}"
}
], ...
}
}
Marvin
07/14/2025, 5:37 AMMarvin
07/14/2025, 5:38 AMprefect.yaml
using the variables cpu_request
and memory_request
. Here's how to modify your deployment:
yaml
definitions:
tags: &common_tags
- "test-tag"
work_pool: &common_work_pool
name: "test-work-pool"
job_variables:
image: "{{ build-image.image }}"
deployments:
- name: "rasmus"
tags:
- *common_tags
- "another-tag"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Rasmus"
work_pool: *common_work_pool # Uses default resources (100m CPU, 100Mi memory)
- name: "arthur"
tags:
- *common_tags
- "High Cpu"
schedule: null
entrypoint: "flows/hello.py:hello"
parameters:
name: "Arthur"
work_pool:
<<: *common_work_pool
job_variables:
image: "{{ build-image.image }}"
cpu_request: "500m" # Override CPU request
memory_request: "512Mi" # Override memory request
A few important notes:
1. The variable names must match exactly what's in your work pool configuration (cpu_request
and memory_request
)
2. I notice your work pool configuration currently only has resource requests but no limits. If you need limits as well, you'll need to add them to your work pool configuration as variables (e.g., cpu_limit
and memory_limit
) and update the job manifest template accordingly
3. The default values (100m CPU, 100Mi memory) will be used for any deployment that doesn't override these values
If you also want to add resource limits, you would need to:
1. Add these variables to your work pool configuration:
json
"cpu_limit": {
"type": "string",
"title": "CPU Limit",
"default": "200m",
"description": "The CPU allocation limit for this pod."
},
"memory_limit": {
"type": "string",
"title": "Memory Limit",
"default": "200Mi",
"description": "The Memory allocation limit for this pod."
}
2. Update the job manifest in your work pool configuration to include limits:
json
"resources": {
"requests": {
"cpu": "{{ cpu_request }}",
"memory": "{{ memory_request }}"
},
"limits": {
"cpu": "{{ cpu_limit }}",
"memory": "{{ memory_limit }}"
}
}
3. Then you could specify limits in your deployment:
yaml
work_pool:
<<: *common_work_pool
job_variables:
image: "{{ build-image.image }}"
cpu_request: "500m"
memory_request: "512Mi"
cpu_limit: "1000m"
memory_limit: "1Gi"
Rasmus
07/14/2025, 5:55 AMMarvin
07/14/2025, 5:55 AMMarvin
07/14/2025, 5:55 AMyaml
deployments:
- name: "datetime-example"
entrypoint: "flows/example.py:flow"
parameters:
# Option 1: ISO 8601 format string (recommended)
start_date: "2024-01-15T10:30:00Z"
# Option 2: Using YAML's built-in timestamp type
end_date: !!timestamp 2024-01-15T10:30:00Z
# Option 3: Date only
just_date: "2024-01-15"
# Option 4: With timezone specification
timezone_date: "2024-01-15T10:30:00+01:00"
When Prefect processes these parameters, it will automatically convert the string representations to Python datetime objects if:
1. The parameter is type-hinted as datetime
in your flow function
2. The string is in a recognizable datetime format
Here's an example of how your flow code should look to properly receive these parameters:
python
from datetime import datetime
from prefect import flow
@flow
def example_flow(
start_date: datetime,
end_date: datetime,
just_date: datetime,
timezone_date: datetime
):
print(f"Start date: {start_date} - Type: {type(start_date)}")
print(f"End date: {end_date} - Type: {type(end_date)}")
print(f"Just date: {just_date} - Type: {type(just_date)}")
print(f"Timezone date: {timezone_date} - Type: {type(timezone_date)}")
Key points:
- Always use quotes around datetime strings in YAML to avoid parsing issues
- The ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ
) is the most reliable
- Include the 'Z' suffix for UTC or proper timezone offset (+01:00) when timezone matters
- Type hints in your flow function are important for proper parameter conversion
If you need to work with specific timezones, you can also use datetime
with pytz
or `zoneinfo`:
python
from datetime import datetime
import pytz
from prefect import flow
@flow
def timezone_flow(event_time: datetime):
# Convert to specific timezone if needed
tz = pytz.timezone('Europe/Copenhagen')
local_time = event_time.astimezone(tz)
print(f"Event time in Copenhagen: {local_time}")
yaml
deployments:
- name: "timezone-example"
entrypoint: "flows/example.py:timezone_flow"
parameters:
event_time: "2024-01-15T10:30:00+00:00"
Rasmus
07/14/2025, 5:57 AMMarvin
07/14/2025, 5:57 AMMarvin
07/14/2025, 5:57 AMpython
# config.py
from pydantic import BaseModel
from datetime import datetime
from typing import List, Optional
class DataConfig(BaseModel):
name: str
start_date: datetime
batch_size: int
features: List[str]
optional_param: Optional[float] = None
# flow.py
from prefect import flow
from .config import DataConfig
@flow
def process_data(config: DataConfig):
print(f"Processing {config.name} with batch size {config.batch_size}")
print(f"Features: {config.features}")
yaml
deployments:
- name: "data-processor"
entrypoint: "flows/flow.py:process_data"
parameters:
config:
name: "my_dataset"
start_date: "2024-01-15T10:30:00Z"
batch_size: 1000
features: ["feature1", "feature2", "feature3"]
optional_param: 0.5
2. Using dataclasses with custom deserializer:
python
# config.py
from dataclasses import dataclass
from datetime import datetime
from typing import List
import yaml
@dataclass
class DataConfig:
name: str
start_date: datetime
batch_size: int
features: List[str]
@classmethod
def from_dict(cls, data: dict):
# Convert string to datetime
if isinstance(data['start_date'], str):
data['start_date'] = datetime.fromisoformat(data['start_date'].replace('Z', '+00:00'))
return cls(**data)
# flow.py
from prefect import flow
from .config import DataConfig
@flow
def process_data(config_dict: dict):
# Convert dict to DataConfig object
config = DataConfig.from_dict(config_dict)
print(f"Processing {config.name} with batch size {config.batch_size}")
yaml
deployments:
- name: "data-processor"
entrypoint: "flows/flow.py:process_data"
parameters:
config_dict:
name: "my_dataset"
start_date: "2024-01-15T10:30:00Z"
batch_size: 1000
features: ["feature1", "feature2", "feature3"]
3. Using a configuration file approach:
python
# config.py
from dataclasses import dataclass
from datetime import datetime
from typing import List
import yaml
@dataclass
class DataConfig:
name: str
start_date: datetime
batch_size: int
features: List[str]
@classmethod
def from_yaml(cls, path: str):
with open(path) as f:
data = yaml.safe_load(f)
if isinstance(data['start_date'], str):
data['start_date'] = datetime.fromisoformat(
data['start_date'].replace('Z', '+00:00')
)
return cls(**data)
# flow.py
from prefect import flow
from .config import DataConfig
@flow
def process_data(config_path: str):
config = DataConfig.from_yaml(config_path)
print(f"Processing {config.name} with batch size {config.batch_size}")
yaml
# config.yaml
name: "my_dataset"
start_date: "2024-01-15T10:30:00Z"
batch_size: 1000
features:
- "feature1"
- "feature2"
- "feature3"
# prefect.yaml
deployments:
- name: "data-processor"
entrypoint: "flows/flow.py:process_data"
parameters:
config_path: "path/to/config.yaml"
Recommendations:
1. Use Pydantic (Best Option):
- Built-in validation
- Automatic type conversion
- Great IDE support
- Clear error messages
- Native JSON/dict serialization/deserialization
2. For Simpler Cases:
- Use dataclasses with custom deserializers
- Good for when you don't need extensive validation
3. For Complex Configurations:
- Use separate configuration files
- Helps manage multiple deployment configurations
- Easier to version control
- Can be shared across different deployments
Important Notes:
- Always validate your data after deserialization
- Handle timezone information explicitly for datetime objects
- Consider using environment variables for sensitive informationMarvin
07/14/2025, 5:57 AM