<@ULVA73B9P> can you show me an example of a prefe...
# ask-marvin
a
@Marvin can you show me an example of a prefect.yaml that creates 1 workpool with multiple work queues?
m
thought for 92.8 seconds
You can create the work pool and its queues using the CLI as well, though the YAML approach is often clearer for version control. The work pool can be created with:
Copy code
bash
prefect work-pool create "my-process-pool" --type process
Then you can manage the work queues through the Prefect UI or API. Remember that different work pool types (process, kubernetes, docker) might have different configuration options available in the
base_job_template
, but the work queue configuration remains consistent across all types.
a
@Marvin yes, can you show me an example of a yaml version? i would like to create definitions and reference it for many deployments
m
thought for 82.9 seconds
Copy code
bash
prefect deploy
This will use your
prefect.yaml
file to create all the defined deployments. The YAML references make it easy to maintain consistent configurations across your entire deployment setup while keeping the configuration file clean and maintainable.
a
can you give me a sample prefect.yaml? i dont know how to structure it
@Marvin can you give me a sample prefect.yaml? i dont know how to structure it
m
thought for 25.1 seconds
I'll provide you with a complete, well-structured
prefect.yaml
file with comments explaining each section. This example will show a common real-world setup with multiple deployments sharing configurations.
Copy code
yaml
# prefect.yaml
version: 3

# The definitions section is where you define reusable components
definitions:
  # Define your work queues that can be reused across work pools
  work_queues: &default_queues
    - name: "high-priority"
      priority: 1
      concurrency_limit: 5
    - name: "medium-priority"
      priority: 2
      concurrency_limit: 10
    - name: "low-priority"
      priority: 3
      concurrency_limit: 15

  # Define common environment variables
  env_vars: &default_env
    POSTGRES_URL: "<postgresql://user:pass@localhost:5432/db>"
    AWS_REGION: "us-east-1"
    LOG_LEVEL: "INFO"

  # Define common tags for your deployments
  tags: &default_tags
    - "production"
    - "data-team"

  # Define common job configurations
  job_configs:
    default: &default_job_config
      env: *default_env  # Reference the environment variables defined above
      labels:
        team: "data-engineering"
        environment: "production"
      tags: *default_tags  # Reference the tags defined above

# Define your work pools
work_pools:
  # A process-based work pool for local development
  - name: "my-process-pool"
    type: "process"
    base_job_template:
      job_configuration: *default_job_config  # Reference the job config defined above
    work_queues: *default_queues  # Reference the work queues defined above

  # A Docker-based work pool for containerized workloads
  - name: "my-docker-pool"
    type: "docker"
    base_job_template:
      job_configuration:
        <<: *default_job_config  # Merge in the default job config
        # Add Docker-specific configurations
        image: "prefecthq/prefect:2-python3.9"
        volumes:
          - "/local/path:/container/path"
    work_queues: *default_queues

# Define your deployments
deployments:
  # Daily ETL flow
  - name: "etl-daily"
    entrypoint: "flows/etl.py:daily_etl_flow"  # path/to/file.py:function_name
    schedule:
      cron: "0 0 * * *"  # Run daily at midnight
    work_pool_name: "my-process-pool"
    work_queue_name: "medium-priority"
    tags: *default_tags
    description: "Daily ETL process for customer data"

  # Real-time data processing flow
  - name: "realtime-processing"
    entrypoint: "flows/realtime.py:process_stream"
    work_pool_name: "my-docker-pool"
    work_queue_name: "high-priority"
    tags:
      - "realtime"
      - "critical"
    description: "Real-time data processing pipeline"
    parameters:
      batch_size: 1000
      timeout_seconds: 3600

  # Data cleanup flow
  - name: "cleanup-flow"
    entrypoint: "flows/maintenance.py:cleanup_data"
    schedule:
      interval: 86400  # Run every 24 hours (in seconds)
    work_pool_name: "my-process-pool"
    work_queue_name: "low-priority"
    description: "Clean up old data from temporary tables"
    parameters:
      retention_days: 30
Let's break down the structure: 1. Version Declaration: - Always start with
version: 3
for Prefect 3.x 2. Definitions Section: - Contains reusable components using YAML anchors (
&
) - Define common configurations once and reference them later with
*
3. Work Pools Section: - Defines different types of work pools (process, docker, etc.) - Each work pool can use the predefined work queues - Contains base job templates that apply to all jobs in the pool 4. Deployments Section: - Lists all your flow deployments - Each deployment specifies: - `name`: Unique identifier - `entrypoint`: Path to your flow file and function - `work_pool_name`: Which work pool to use - `work_queue_name`: Which queue to use - Optional:
schedule
,
tags
,
parameters
, etc. To use this configuration: 1. Save it as
prefect.yaml
in your project root directory
2. Make sure your flow files exist in the specified locations 3. Deploy all flows with:
Copy code
bash
prefect deploy
Or deploy a specific flow:
Copy code
bash
prefect deploy --name "etl-daily"
Some tips for structuring your `prefect.yaml`: 1. Keep Related Items Together: Group related deployments and their configurations 2. Use Meaningful Names: Choose descriptive names for work pools, queues, and deployments 3. Leverage YAML References: Use
&
to define and
*
to reference common configurations 4. Include Comments: Document non-obvious configurations or requirements 5. Consistent Indentation: Use 2 or 4 spaces consistently (the example uses 2) You can customize this template based on your needs: - Add more work pools with different configurations - Define different environment variables for dev/staging/prod - Add more deployment configurations - Customize schedules, parameters, and other settings Remember that all paths in
entrypoint
are relative to where your
prefect.yaml
file is located. Make sure your project structure matches the paths you specify in the deployments.
a
@Marvin is it possible to use a YAML anchor reference for the work queues?
m
thought for 26.3 seconds
Yes, absolutely! In fact, that's a great practice for DRY (Don't Repeat Yourself) configuration. Let me show you how to properly structure the YAML with anchor references for work queues.
Copy code
yaml
# prefect.yaml
version: 3

definitions:
  # Define work queue configurations that can be reused
  queues:
    # Define a standard set of work queues
    standard: &standard_queues
      - name: "high-priority"
        priority: 1
        concurrency_limit: 5
      - name: "medium-priority"
        priority: 2
        concurrency_limit: 10
      - name: "low-priority"
        priority: 3
        concurrency_limit: 15
    
    # Define a different set of queues for development
    development: &dev_queues
      - name: "dev-high"
        priority: 1
        concurrency_limit: 2
      - name: "dev-low"
        priority: 2
        concurrency_limit: 5

  # Define common environment variables
  environments:
    prod: &prod_env
      DATABASE_URL: "<postgresql://prod-db:5432/db>"
      LOG_LEVEL: "INFO"
    
    dev: &dev_env
      DATABASE_URL: "<postgresql://dev-db:5432/db>"
      LOG_LEVEL: "DEBUG"

  # Define job configurations
  job_configs:
    prod: &prod_config
      env: *prod_env
      labels:
        environment: "production"
    
    dev: &dev_config
      env: *dev_env
      labels:
        environment: "development"

# Work pool definitions
work_pools:
  # Production work pools
  - name: "prod-process-pool"
    type: "process"
    base_job_template:
      job_configuration: *prod_config
    work_queues: *standard_queues  # Reference to standard queues

  - name: "prod-docker-pool"
    type: "docker"
    base_job_template:
      job_configuration: *prod_config
    work_queues: *standard_queues  # Same reference to standard queues

  # Development work pool
  - name: "dev-process-pool"
    type: "process"
    base_job_template:
      job_configuration: *dev_config
    work_queues: *dev_queues  # Reference to development queues

# Deployment definitions
deployments:
  - name: "etl-daily-prod"
    entrypoint: "flows/etl.py:daily_etl_flow"
    schedule:
      cron: "0 0 * * *"
    work_pool_name: "prod-process-pool"
    work_queue_name: "medium-priority"  # References queue name defined in standard_queues

  - name: "etl-daily-dev"
    entrypoint: "flows/etl.py:daily_etl_flow"
    work_pool_name: "dev-process-pool"
    work_queue_name: "dev-low"  # References queue name defined in dev_queues

  - name: "critical-process"
    entrypoint: "flows/critical.py:process_flow"
    work_pool_name: "prod-docker-pool"
    work_queue_name: "high-priority"
Key points about this structure: 1. Queue Definitions: - Queues are defined once in the
definitions.queues
section - Use
&standard_queues
and
&dev_queues
as anchors - Reference them later with
*standard_queues
and
*dev_queues
2. Multiple Queue Sets: - You can define different sets of queues for different purposes - Each set can have its own naming convention and limits - Reference them independently in different work pools 3. Reusability: - The same queue configuration can be used across multiple work pools - Changes to queue configurations only need to be made in one place - Maintains consistency across work pools that should share the same queue structure 4. Deployment References: - Deployments reference the queue names defined in the queue sets - Make sure the
work_queue_name
matches a name defined in the referenced queue set You can even create more complex structures, like having different queue sets for different environments or use cases: ```yaml definitions: queues: # Standard production queues standard: &standard_queues - name: "high-priority" priority: 1 concurrency_limit: 5 - name: "medium-priority" priority: 2 concurrency_limit: 10 - name: "low-priority" priority: 3 concurrency_limit: 15 # Development queues dev: &dev_queues
- name: "dev-high" priority: 1 concurrency_limit: 2 - name: "dev-low" priority: 2 concurrency_limit: 5 # Special queues for batch processing batch: &batch_queues - name: "batch-large" priority: 1 concurrency_limit: 3 - name: "batch-small" priority: 2 concurrency_limit: 8 work_pools: - name: "prod-process-pool" type: "process" base_job_template: job_configuration: *prod_config work_queues: *standard_queues - name: "batch-process-pool" type: "process" base_job_template: job_configuration: *prod_config work_queues: *batch_queues - name: "dev-pool" type: "process" base_job_template: job_configuration: *dev_config work_queues: *dev_queues ``` This approach makes it easy to: 1. Maintain consistent queue configurations across similar work pools 2. Make changes to queue configurations in one place 3. Have different queue setups for different environments or use cases 4. Keep your configuration DRY and maintainable Remember that when you reference a queue name in a deployment, it needs to exist in the queue set that's referenced by the specified work pool. For example, you can't use a queue named "batch-large" with a work pool that uses the
*standard_queues
configuration.
a
@Marvin what if i want to define work queues within a single work pool?
m
thought for 25.9 seconds
I'll show you how to define work queues directly within a single work pool. This is a simpler approach when you don't need to reuse queue configurations across multiple work pools.
Copy code
yaml
# prefect.yaml
version: 3

definitions:
  # Define common environment variables if needed
  env_vars: &default_env
    DATABASE_URL: "<postgresql://user:pass@localhost:5432/db>"
    LOG_LEVEL: "INFO"

  # Define common job configuration
  job_configs: &default_job_config
    env: *default_env
    labels:
      team: "data-engineering"
      environment: "production"

# Work pool with inline work queue definitions
work_pools:
  - name: "my-process-pool"
    type: "process"
    base_job_template:
      job_configuration: *default_job_config
    # Define work queues directly within the work pool
    work_queues:
      - name: "high-priority"
        priority: 1
        concurrency_limit: 5
      - name: "medium-priority"
        priority: 2
        concurrency_limit: 10
      - name: "low-priority"
        priority: 3
        concurrency_limit: 15

# Deployments using the work queues
deployments:
  - name: "critical-etl"
    entrypoint: "flows/etl.py:critical_flow"
    work_pool_name: "my-process-pool"
    work_queue_name: "high-priority"
    description: "Critical ETL process that needs priority"

  - name: "daily-reports"
    entrypoint: "flows/reporting.py:daily_reports"
    work_pool_name: "my-process-pool"
    work_queue_name: "medium-priority"
    schedule:
      cron: "0 0 * * *"  # Daily at midnight
    description: "Daily reporting process"

  - name: "data-cleanup"
    entrypoint: "flows/maintenance.py:cleanup"
    work_pool_name: "my-process-pool"
    work_queue_name: "low-priority"
    schedule:
      interval: 86400  # Every 24 hours
    description: "Low priority cleanup tasks"
This approach is cleaner when you: - Only need one work pool - Have a specific set of queues for this work pool - Don't need to reuse queue configurations elsewhere You can also add more complex configurations within the same structure: ```yaml version: 3 definitions: env_vars: &default_env DATABASE_URL: "postgresql://user:pass@localhost:5432/db" LOG_LEVEL: "INFO" AWS_REGION: "us-east-1" job_configs: &default_job_config env: *default_env labels: team: "data-engineering" environment: "production" tags: - "production" - "v2" work_pools: - name: "my-process-pool" type: "process" base_job_template: job_configuration: *default_job_config work_queues: # Critical queue for urgent tasks - name: "critical" priority: 1 concurrency_limit: 3 description: "For urgent, time-sensitive tasks" tags: - "critical" - "high-priority" # Standard processing queue - name: "standard" priority: 2 concurrency_limit: 8 description: "For regular daily processing" tags: - "standard" - "daily" # Batch processing queue - name: "batch" priority: 3 concurrency_limit: 15 description: "For large batch processes" tags: - "batch" - "high-concurrency" # Background tasks queue - name: "background" priority: 4 concurrency_limit: 5 description: "For non-time-critical background tasks" tags: - "background" - "low-priority" deployments: - name: "urgent-data-sync" entrypoint: "flows/sync.py:urgent_sync" work_pool_name: "my-process-pool" work_queue_name: "critical" description: "Urgent data synchronization process" parameters: timeout_seconds: 1800 retry_attempts: 3 - name: "daily-etl" entrypoint: "flows/etl.py:daily_process" work_pool_name: "my-process-pool" work_queue_name: "standard" schedule: cron: "0 1 * * *" # Daily at 1 AM description: "Daily ETL processing" - name: "large-batch-process"
entrypoint: "flows/batch.py:process_batch" work_pool_name: "my-process-pool" work_queue_name: "batch" description: "Large batch data processing" parameters: batch_size: 10000 - name: "cleanup-routine" entrypoint: "flows/maintenance.py:cleanup" work_pool_name: "my-process-pool" work_queue_name: "background" schedule: interval: 43200 # Every 12 hours description: "Regular cleanup routine" ``` Key benefits of this approach: 1. Clarity: All queue configurations are visible in one place 2. Simplicity: No need for YAML anchors and references for queues 3. Maintainability: Easy to add or modify queues for this specific work pool 4. Documentation: Can include descriptions and tags for each queue Remember: - Lower priority numbers mean higher priority (1 is highest) - Each queue can have its own concurrency limit - Queue names must be unique within a work pool - Deployments must reference queue names exactly as they're defined - You can add descriptions and tags to queues for better organization This structure is particularly useful when you have a single work pool with well-defined queue requirements and don't need to replicate the queue structure elsewhere in your configuration.
a
@Marvin it doesnt seem to work?
m
thought for 49.8 seconds
I apologize for my previous responses. You're right - the work queues are actually created separately from the YAML file. Here's the correct approach: 1. First, create a basic
prefect.yaml
file:
Copy code
yaml
version: 3

work_pools:
  - name: "my-process-pool"
    type: "process"
    base_job_template:
      job_configuration:
        env:
          MY_ENV_VAR: "value"

deployments:
  - name: "etl-daily"
    entrypoint: "flows/etl.py:daily_etl_flow"
    work_pool_name: "my-process-pool"
    work_queue_name: "high-priority"  # Reference the queue name you'll create

  - name: "data-cleanup"
    entrypoint: "flows/maintenance.py:cleanup_flow"
    work_pool_name: "my-process-pool"
    work_queue_name: "low-priority"  # Reference the queue name you'll create
2. Then, create the work pool and its queues using the CLI:
Copy code
bash
# Create the work pool
prefect work-pool create "my-process-pool" --type process

# Create the work queues
prefect work-pool queue create "high-priority" --pool "my-process-pool" --priority 1 --concurrency-limit 5
prefect work-pool queue create "medium-priority" --pool "my-process-pool" --priority 2 --concurrency-limit 10
prefect work-pool queue create "low-priority" --pool "my-process-pool" --priority 3 --concurrency-limit 15
The work queues are managed through the CLI or the UI, not directly in the YAML file. The YAML file only references these queues in the deployments section. You can also manage work queues through the Prefect UI: 1. Go to the Work Pools page 2. Click on your work pool 3. Use the "Work Queues" tab to create and manage queues This is the correct way to set up work queues in Prefect 3.x. I apologize for my earlier incorrect examples. The YAML file is used to define deployments and their relationships to work pools and queues, but the actual queue creation and management is done through the CLI or UI. Would you like me to provide more details about managing work queues through the CLI or UI?