<@ULVA73B9P> Currently, im running prefect version...
# ask-marvin
d
@Marvin Currently, im running prefect version 3.1.13 on eks. I have a issue with concurrency_limit. For the deployment, i set the concurrency_limit = 100, i try to trigger create flow run with prefect API for 1000 job, but in the same moment, it just run 4-50 concurrency jobs and other job, in the flow_run table with scheduled status but not show on prefect UI
m
thought for 56.9 seconds
Based on your description and the research, there are several potential factors that could be limiting your concurrent runs: 1. Worker Capacity: - Even though you've set
concurrency_limit=100
on the deployment, the actual number of concurrent runs is also limited by your worker configuration. - How many workers do you have running in your EKS cluster? - What is the work pool type you're using? (Kubernetes, Docker, etc.) 2. Resource Constraints: - EKS cluster capacity (CPU/memory) could be limiting the number of concurrent pods that can be scheduled - Worker pod resources and limits could be affecting how many concurrent runs can be managed 3. Work Pool Configuration: - Work pool concurrency settings might be overriding or affecting your deployment concurrency limit - You might need to check both the work pool and work queue concurrency settings To troubleshoot this, I recommend: 1. Check your work pool configuration:
Copy code
bash
prefect work-pool inspect YOUR_WORK_POOL_NAME
2. Verify your worker logs to see if there are any resource constraints or errors being reported 3. Check your EKS cluster metrics: - Available CPU/memory - Pod scheduling status - Any pod pending states 4. Verify that your flow runs are being assigned to the correct work pool and queue Could you please provide: 1. What type of work pool are you using? 2. How many workers do you have running? 3. Are you seeing any error messages in the worker logs? 4. Are the "scheduled" flow runs assigned to a specific work queue? This information would help me provide more specific guidance for your setup. Also, you mentioned the runs don't show up in the UI - are they visible in the database but not in the UI, or are they not being picked up by workers at all? Remember that while the deployment concurrency limit sets a maximum, the actual number of concurrent runs is determined by the minimum of several factors: - Deployment concurrency limit (100 in your case) - Work pool/queue concurrency limits - Worker capacity - Infrastructure resources - Global concurrency limits (if any are set)
d
@Marvin i have two workers. the limit of my cpu on eks is 2000 vcpu, and no have any limit other. total job run cpu is less than 1000vcpu
m
thought for 54.5 seconds
Based on your situation, here are a few specific things to check and potential solutions: 1. Worker Configuration - Each worker has its own limit on how many concurrent runs it can handle. Check your worker configuration for the
--limit
parameter. - Try running this command to see your current workers' status:
Copy code
bash
   prefect worker ls
2. Work Pool Settings Let's check your work pool configuration:
Copy code
bash
   prefect work-pool inspect YOUR_WORK_POOL_NAME
Pay attention to: - The work pool concurrency limit - Any queue-specific concurrency limits - The worker type (Kubernetes, Process, etc.) 3. Worker Process Limits Since you have two workers, make sure each worker is configured to handle enough concurrent flow runs. The default might be lower than you expect. You can modify this by: - Updating the worker deployment YAML to include appropriate concurrency settings - Setting the appropriate process limits in your worker configuration 4. Troubleshooting Steps: 1. Check the worker logs for any warnings or errors about concurrency or resource limits 2. Verify that flow runs are being distributed between both workers 3. Monitor the worker processes to ensure they're healthy and not restarting Could you please provide: 1. The output of
prefect work-pool inspect YOUR_WORK_POOL_NAME
? 2. Are you using a Kubernetes work pool type? 3. How are your workers deployed (deployment YAML configuration)? 4. Are you seeing any specific patterns in when/how the flow runs get picked up? Also, regarding the UI issue where scheduled runs don't show up - this might be related to the UI's default filtering. Try: 1. Changing the filter in the UI to show "Scheduled" status 2. Checking if the runs appear in the database with:
Copy code
python
   from prefect.client import get_client
   async with get_client() as client:
       flows = await client.read_flows()
The fact that you're seeing varying numbers (4-50) suggests this might be related to how the workers are processing the runs rather than a hard limit. Let's identify the exact configuration once you provide the above details, and we can then make specific recommendations for your setup.
d
@Marvin prefect worker ls Usage: prefect worker [OPTIONS] COMMAND [ARGS]... Try 'prefect worker --help' for help. ╭─ Error ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │ No such command 'ls'. │ ╰─────────────────────────────────
m
thought for 27.8 seconds
Based on what we know so far, here are a few potential issues that could be limiting your concurrent runs: 1. Worker Configuration - The number of concurrent runs a worker can handle might be limited by its configuration - In Kubernetes deployments, this is often controlled by the worker deployment spec 2. Work Pool Settings - There might be concurrency limits set at the work pool level - Queue-specific limits might be in place 3. Infrastructure Scaling - Even though you have plenty of CPU (2000 vCPU), there might be other Kubernetes resource quotas or limits in place - The worker pods might need specific resource requests/limits configured To properly diagnose and fix this, please provide: 1. The output of
prefect work-pool ls
2. The output of
prefect work-pool inspect YOUR_POOL_NAME
3. Your worker Kubernetes deployment YAML Once you share these details, we can: - Verify the work pool concurrency settings - Check the worker configuration - Identify any potential bottlenecks - Provide specific recommendations for your setup Also, when you say the jobs are in "scheduled" status but not showing in the UI, can you confirm: - Are these runs visible if you explicitly filter for "Scheduled" status in the UI? - Are you able to see these runs when querying the database directly? - Do you see any error messages in the worker logs?
d
@Marvin extraction) prefect work-pool inspect k8s-pool WorkPool( id='7e78905f-1df6-4ecc-a2c8-37dc86d9901d', created=DateTime(2025, 1, 15, 17, 0, 29, 923061, tzinfo=Timezone('UTC')), updated=DateTime(2025, 8, 3, 14, 3, 29, 777465, tzinfo=Timezone('UTC')), name='k8s-pool', type='kubernetes', base_job_template={ 'variables': { 'type': 'object', 'properties': { 'env': {'type': 'object', 'title': 'Environment Variables', 'description': 'Environment variables to set when starting a flow run.', 'additionalProperties': {'type': 'string'}}, 'name': {'type': 'string', 'title': 'Name', 'description': 'Name given to infrastructure created by a worker.'}, 'image': { 'type': 'string', 'title': 'Image', 'default': 'v3.1.13-prefect-67-g2f79bd3', 'description': 'The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.' }, 'labels': {'type': 'object', 'title': 'Labels', 'description': 'Labels applied to infrastructure created by a worker.', 'additionalProperties': {'type': 'string'}}, 'command': { 'type': 'string', 'title': 'Command', 'description': 'The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.' }, 'namespace': {'type': 'string', 'title': 'Namespace', 'default': 'prefect', 'description': 'The Kubernetes namespace to create jobs within.'}, 'node_pool': { 'enum': ['prefect-worker', 'emr-spark', 'extraction-worker'], 'type': 'string', 'title': 'Node Pool', 'default': 'prefect-worker', 'description': 'The node pool to use for job creation.' }, 'capacity_type': { 'enum': ['spot', 'on-demand'], 'type': 'string', 'title': 'Capacity for pod', 'default': 'on-demand', 'description': 'The capacity type to use for job creation. If set to "any", the worker will use any available capacity type. If set to "spot", the worker will only use spot capacity. If set to "on-demand", the worker will only use on-demand capacity.' }, 'stream_output': {'type': 'boolean', 'title': 'Stream Output', 'default': True, 'description': 'If set, output will be streamed from the job to local standard output.'}, 'cluster_config': { 'allOf': [{'$ref': '#/definitions/KubernetesClusterConfig'}], 'title': 'Cluster Config', 'description': 'The Kubernetes cluster config to use for job creation.' }, 'finished_job_ttl': { 'type': 'integer', 'title': 'Finished Job TTL', 'default': 180, 'description': 'The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.' }, 'k8s_resource_cpu': {'type': 'string', 'title': 'Resource CPU', 'default': '500m', 'description': 'The amount of CPU to request and limit for job containers.'}, 'image_pull_policy': { 'enum': ['IfNotPresent', 'Always', 'Never'], 'type': 'string', 'title': 'Image Pull Policy', 'default': 'IfNotPresent', 'description': 'The Kubernetes image pull policy to use for job containers.' }, 'k8s_resource_memory': {'type': 'string', 'title': 'Resource Memory', 'default': '1Gi', 'description': 'The amount of memory to request and limit for job containers.'}, 'service_account_name': { 'type': 'string', 'title': 'Service Account Name', 'default': 'prefect-deployment-sa', 'description': 'The Kubernetes service account to use for job creation.' }, 'job_watch_timeout_seconds': { 'type': 'integer', 'title': 'Job Watch Timeout Seconds', 'description': 'Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.' }, 'pod_watch_timeout_seconds': { 'type': 'integer', 'title': 'Pod Watch Timeout Seconds', 'default': 10800, 'description': 'Number of seconds to watch for pod creation before timing out.' } }, 'definitions': { 'KubernetesClusterConfig': { 'type': 'object', 'title': 'KubernetesClusterConfig', 'required': ['config', 'context_name'], 'properties': { 'config': {'type': 'object', 'title': 'Config', 'description': 'The entire contents of a kubectl config file.'}, 'context_name': {'type': 'string', 'title': 'Context Name', 'description': 'The name of the kubectl context to use.'} }, 'description': 'Stores configuration for interaction with Kubernetes clusters.\nSee
from_file
for creation.', 'secret_fields': [], 'block_type_slug': 'kubernetes-cluster-config', 'block_schema_references': {} } }, 'description': 'Default variables for the Kubernetes worker.\nThe schema for this class is used to populate the
variables
section of the default base job template.' }, 'job_configuration': { 'env': '{{ env }}', 'name': '{{ name }}', 'labels': '{{ labels }}', 'command': '{{ command }}', 'namespace': '{{ namespace }}', 'job_manifest': { 'kind': 'Job', 'spec': { 'template': { 'spec': { 'containers': [ { 'env': [ {'name': 'POD_IP', 'valueFrom': {'fieldRef': {'fieldPath': 'status.podIP'}}}, {'name': 'POD_NAME', 'valueFrom': {'fieldRef': {'fieldPath': 'metadata.name'}}}, {'name': 'POD_NAMESPACE', 'valueFrom': {'fieldRef': {'fieldPath': 'metadata.namespace'}}} ], 'args': '{{ command }}', 'name': 'prefect-job', 'image': '{{ image }}', 'resources': { 'limits': {'cpu': '{{ k8s_resource_cpu }}', 'memory': '{{ k8s_resource_memory }}'}, 'requests': {'cpu': '{{ k8s_resource_cpu }}', 'memory': '{{ k8s_resource_memory }}'} }, 'imagePullPolicy': '{{ image_pull_policy }}' } ], 'completions': 1, 'parallelism': 1, 'tolerations': [{'key': 'node-pool', 'value': '{{ node_pool }}', 'effect': 'NoSchedule', 'operator': 'Equal'}], 'nodeSelector': {'appname': '{{ node_pool }}', 'kubernetes.io/arch': 'amd64', 'karpenter.sh/capacity-type': '{{ capacity_type }}'}, 'restartPolicy': 'Never', 'serviceAccountName': '{{ service_account_name }}' } }, 'backoffLimit': 0, 'podFailurePolicy': { 'rules': [ {'action': 'Ignore', 'onPodConditions': [{'type': 'DisruptionTarget'}]}, {'action': 'FailJob', 'onExitCodes': {'values': [0], 'operator': 'NotIn', 'containerName': 'prefect-job'}} ] }, 'ttlSecondsAfterFinished': '{{ finished_job_ttl }}' }, 'metadata': {'labels': '{{ labels }}', 'namespace': '{{ namespace }}', 'generateName': '{{ name }}-'}, 'apiVersion': 'batch/v1' }, 'stream_output': '{{ stream_output }}', 'cluster_config': '{{ cluster_config }}', 'job_watch_timeout_seconds': '{{ job_watch_timeout_seconds }}', 'pod_watch_timeout_seconds': '{{ pod_watch_timeout_seconds }}' } }, status=WorkPoolStatus.READY, default_queue_id='5b0c4ebb-7080-4e74-846b-d49c504b9d81' )
m
Your message was too long, here's your message at the allowed limit: extraction) prefect work-pool inspect k8s-pool WorkPool( id='7e78905f-1df6-4ecc-a2c8-37dc86d9901d' created=DateTime(2025 1 15 17 0 29 923061 tzinfo=Timezone('UTC')) updated=DateTime(2025 8 3 14 3 29 777465 tzinfo=Timezone('UTC')) name='k8s-pool' type='kubernetes' base_job_template={ 'variables' { 'type' 'object' 'properties' { 'env' {'type' 'object' 'title' 'Environment Variables' 'description' 'Environment variables to set when starting a flow run ' 'additionalProperties' {'type' 'string'}} 'name' {'type' 'string' 'title' 'Name' 'description' 'Name given to infrastructure created by a worker '} 'image' { 'type' 'string' 'title' 'Image' 'default' '<http //272043165569 dkr ecr ap-southeast-1 amazonaws com/application/sit/dna/eod/eod-batch/eod-cloud-extraction-manager v3 1 13-prefect-67-g2f79bd3|v3 1 13-prefect-67-g2f79bd3>' 'description' 'The image reference of a container image to use for created jobs If not set the latest Prefect image will be used ' } 'labels' {'type' 'object' 'title' 'Labels' 'description' 'Labels applied to infrastructure created by a worker ' 'additionalProperties' {'type' 'string'}} 'command' { 'type' 'string' 'title' 'Command' 'description' 'The command to use when starting a flow run In most cases this should be left blank and the command will be automatically generated by the worker ' } 'namespace' {'type' 'string' 'title' 'Namespace' 'default' 'prefect' 'description' 'The Kubernetes namespace to create jobs within '} 'node_pool' { 'enum' ['prefect-worker' 'emr-spark' 'extraction-worker'] 'type' 'string' 'title' 'Node Pool' 'default' 'prefect-worker' 'description' 'The node pool to use for job creation ' } 'capacity_type' { 'enum' ['spot' 'on-demand'] 'type' 'string' 'title' 'Capacity for pod' 'default' 'on-demand' 'description' 'The capacity type to use for job creation If set to any the worker will use any available capacity type If set to spot the worker will only use spot capacity If set to on-demand the worker will only use on-demand capacity ' } 'stream_output' {'type' 'boolean' 'title' 'Stream Output' 'default' True 'description' 'If set output will be streamed from the job to local standard output '} 'cluster_config' { 'allOf' [{'$ref' '#/definitions/KubernetesClusterConfig'}] 'title' 'Cluster Config' 'description' 'The Kubernetes cluster config to use for job creation ' } 'finished_job_ttl' { 'type' 'integer' 'title' 'Finished Job TTL' 'default' 180 'description' 'The number of seconds to retain jobs after completion If set finished jobs will be cleaned up by Kubernetes after the given delay If not set jobs will be retained indefinitely ' } 'k8s_resource_cpu' {'type' 'string' 'title' 'Resource CPU' 'default' '500m' 'description' 'The amount of CPU to request and limit for job containers '} 'image_pull_policy' { 'enum' ['IfNotPresent' 'Always' 'Never'] 'type' 'string' 'title' 'Image Pull Policy' 'default' 'IfNotPresent' 'description' 'The Kubernetes image pull policy to use for job containers ' } 'k8s_resource_memory' {'type' 'string' 'title' 'Resource Memory' 'default' '1Gi' 'description' 'The amount of memory to request and limit for job containers '} 'service_account_name' { 'type' 'string' 'title' 'Service Account Name' 'default' 'prefect-deployment-sa' 'description' 'The Kubernetes service account to use for job creation ' } 'job_watch_timeout_seconds' { 'type' 'integer' 'title' 'Job Watch Timeout Seconds' 'description' 'Number of seconds to wait for each event emitted by a job before timing out If not set the worker will wait for each event indefinitely ' } 'pod_watch_timeout_seconds' { 'type' 'integer' 'title' 'Pod Watch Timeout Seconds' 'default' 10800 'description' 'Number of seconds to watch for pod creation before
d
it is my config for work pool @Marvin
m
thought for 6.3 seconds
I don't see the work pool configuration you mentioned. Could you please share: 1. The output of:
Copy code
bash
prefect work-pool ls
2. And the output of:
Copy code
bash
prefect work-pool inspect YOUR_POOL_NAME
This information will help us understand: - Your work pool type (Kubernetes, Process, etc.) - Any concurrency limits set at the work pool level - Queue configurations - Worker settings Also, it would be helpful to see: 1. Your worker Kubernetes deployment YAML 2. The configuration you use to set the concurrency_limit=100 (is it in the deployment configuration or set somewhere else?) With this information, we can better diagnose why you're only seeing 4-50 concurrent jobs instead of the expected 100, despite having ample CPU resources available.
d
@Marvin extraction) prefect work-pool inspect k8s-pool WorkPool( id='7e78905f-1df6-4ecc-a2c8-37dc86d9901d', created=DateTime(2025, 1, 15, 17, 0, 29, 923061, tzinfo=Timezone('UTC')), updated=DateTime(2025, 8, 3, 14, 3, 29, 777465, tzinfo=Timezone('UTC')), name='k8s-pool', type='kubernetes', base_job_template={ 'variables': { 'type': 'object', 'properties': { 'env': {'type': 'object', 'title': 'Environment Variables', 'description': 'Environment variables to set when starting a flow run.', 'additionalProperties': {'type': 'string'}}, 'name': {'type': 'string', 'title': 'Name', 'description': 'Name given to infrastructure created by a worker.'}, 'image': { 'type': 'string', 'title': 'Image', 'default': 'v3.1.13-prefect-67-g2f79bd3', 'description': 'The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used.' }, 'labels': {'type': 'object', 'title': 'Labels', 'description': 'Labels applied to infrastructure created by a worker.', 'additionalProperties': {'type': 'string'}}, 'command': { 'type': 'string', 'title': 'Command', 'description': 'The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.' }, 'namespace': {'type': 'string', 'title': 'Namespace', 'default': 'prefect', 'description': 'The Kubernetes namespace to create jobs within.'}, 'node_pool': { 'enum': ['prefect-worker', 'emr-spark', 'extraction-worker'], 'type': 'string', 'title': 'Node Pool', 'default': 'prefect-worker', 'description': 'The node pool to use for job creation.' }, 'capacity_type': { 'enum': ['spot', 'on-demand'], 'type': 'string', 'title': 'Capacity for pod', 'default': 'on-demand', 'description': 'The capacity type to use for job creation. If set to "any", the worker will use any available capacity type. If set to "spot", the worker will only use spot capacity. If set to "on-demand", the worker will only use on-demand capacity.' }, 'stream_output': {'type': 'boolean', 'title': 'Stream Output', 'default': True, 'description': 'If set, output will be streamed from the job to local standard output.'}, 'cluster_config': { 'allOf': [{'$ref': '#/definitions/KubernetesClusterConfig'}], 'title': 'Cluster Config', 'description': 'The Kubernetes cluster config to use for job creation.' }, 'finished_job_ttl': { 'type': 'integer', 'title': 'Finished Job TTL', 'default': 180, 'description': 'The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely.' }, 'k8s_resource_cpu': {'type': 'string', 'title': 'Resource CPU', 'default': '500m', 'description': 'The amount of CPU to request and limit for job containers.'}, 'image_pull_policy': { 'enum': ['IfNotPresent', 'Always', 'Never'], 'type': 'string', 'title': 'Image Pull Policy', 'default': 'IfNotPresent', 'description': 'The Kubernetes image pull policy to use for job containers.' }, 'k8s_resource_memory': {'type': 'string', 'title': 'Resource Memory', 'default': '1Gi', 'description': 'The amount of memory to request and limit for job containers.'}, 'service_account_name': { 'type': 'string', 'title': 'Service Account Name', 'default': 'prefect-deployment-sa', 'description': 'The Kubernetes service account to use for job creation.' }, 'job_watch_timeout_seconds': { 'type': 'integer', 'title': 'Job Watch Timeout Seconds', 'description': 'Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely.' }, 'pod_watch_timeout_seconds': { 'type': 'integer', 'title': 'Pod Watch Timeout Seconds', 'default': 10800, 'description': 'Number of seconds to watch for pod creation before timing out.' } }, 'definitions': { 'KubernetesClusterConfig': { 'type': 'object', 'title': 'KubernetesClusterConfig', 'required': ['config', 'context_name'], 'properties': { 'config': {'type': 'object', 'title': 'Config', 'description': 'The entire contents of a kubectl config file.'}, 'context_name': {'type': 'string', 'title': 'Context Name', 'description': 'The name of the kubectl context to use.'} }, 'description': 'Stores configuration for interaction with Kubernetes clusters.\nSee
from_file
for creation.', 'secret_fields': [], 'block_type_slug': 'kubernetes-cluster-config', 'block_schema_references': {} } }, 'description': 'Default variables for the Kubernetes worker.\nThe schema for this class is used to populate the
variables
section of the default base job template.' }, 'job_configuration': { 'env': '{{ env }}', 'name': '{{ name }}', 'labels': '{{ labels }}', 'command': '{{ command }}', 'namespace': '{{ namespace }}', 'job_manifest': { 'kind': 'Job', 'spec': { 'template': { 'spec': { 'containers': [ { 'env': [ {'name': 'POD_IP', 'valueFrom': {'fieldRef': {'fieldPath': 'status.podIP'}}}, {'name': 'POD_NAME', 'valueFrom': {'fieldRef': {'fieldPath': 'metadata.name'}}}, {'name': 'POD_NAMESPACE', 'valueFrom': {'fieldRef': {'fieldPath': 'metadata.namespace'}}} ], 'args': '{{ command }}', 'name': 'prefect-job', 'image': '{{ image }}', 'resources': { 'limits': {'cpu': '{{ k8s_resource_cpu }}', 'memory': '{{ k8s_resource_memory }}'}, 'requests': {'cpu': '{{ k8s_resource_cpu }}', 'memory': '{{ k8s_resource_memory }}'} }, 'imagePullPolicy': '{{ image_pull_policy }}' } ], 'completions': 1, 'parallelism': 1, 'tolerations': [{'key': 'node-pool', 'value': '{{ node_pool }}', 'effect': 'NoSchedule', 'operator': 'Equal'}], 'nodeSelector': {'appname': '{{ node_pool }}', 'kubernetes.io/arch': 'amd64', 'karpenter.sh/capacity-type': '{{ capacity_type }}'}, 'restartPolicy': 'Never', 'serviceAccountName': '{{ service_account_name }}' } }, 'backoffLimit': 0, 'podFailurePolicy': { 'rules': [ {'action': 'Ignore', 'onPodConditions': [{'type': 'DisruptionTarget'}]}, {'action': 'FailJob', 'onExitCodes': {'values': [0], 'operator': 'NotIn', 'containerName': 'prefect-job'}} ] }, 'ttlSecondsAfterFinished': '{{ finished_job_ttl }}' }, 'metadata': {'labels': '{{ labels }}', 'namespace': '{{ namespace }}', 'generateName': '{{ name }}-'}, 'apiVersion': 'batch/v1' }, 'stream_output': '{{ stream_output }}', 'cluster_config': '{{ cluster_config }}', 'job_watch_timeout_seconds': '{{ job_watch_timeout_seconds }}', 'pod_watch_timeout_seconds': '{{ pod_watch_timeout_seconds }}' } }, status=WorkPoolStatus.READY, default_queue_id='5b0c4ebb-7080-4e74-846b-d49c504b9d81' )
m
Your message was too long, here's your message at the allowed limit: extraction) prefect work-pool inspect k8s-pool WorkPool( id='7e78905f-1df6-4ecc-a2c8-37dc86d9901d' created=DateTime(2025 1 15 17 0 29 923061 tzinfo=Timezone('UTC')) updated=DateTime(2025 8 3 14 3 29 777465 tzinfo=Timezone('UTC')) name='k8s-pool' type='kubernetes' base_job_template={ 'variables' { 'type' 'object' 'properties' { 'env' {'type' 'object' 'title' 'Environment Variables' 'description' 'Environment variables to set when starting a flow run ' 'additionalProperties' {'type' 'string'}} 'name' {'type' 'string' 'title' 'Name' 'description' 'Name given to infrastructure created by a worker '} 'image' { 'type' 'string' 'title' 'Image' 'default' '<http //272043165569 dkr ecr ap-southeast-1 amazonaws com/application/sit/dna/eod/eod-batch/eod-cloud-extraction-manager v3 1 13-prefect-67-g2f79bd3|v3 1 13-prefect-67-g2f79bd3>' 'description' 'The image reference of a container image to use for created jobs If not set the latest Prefect image will be used ' } 'labels' {'type' 'object' 'title' 'Labels' 'description' 'Labels applied to infrastructure created by a worker ' 'additionalProperties' {'type' 'string'}} 'command' { 'type' 'string' 'title' 'Command' 'description' 'The command to use when starting a flow run In most cases this should be left blank and the command will be automatically generated by the worker ' } 'namespace' {'type' 'string' 'title' 'Namespace' 'default' 'prefect' 'description' 'The Kubernetes namespace to create jobs within '} 'node_pool' { 'enum' ['prefect-worker' 'emr-spark' 'extraction-worker'] 'type' 'string' 'title' 'Node Pool' 'default' 'prefect-worker' 'description' 'The node pool to use for job creation ' } 'capacity_type' { 'enum' ['spot' 'on-demand'] 'type' 'string' 'title' 'Capacity for pod' 'default' 'on-demand' 'description' 'The capacity type to use for job creation If set to any the worker will use any available capacity type If set to spot the worker will only use spot capacity If set to on-demand the worker will only use on-demand capacity ' } 'stream_output' {'type' 'boolean' 'title' 'Stream Output' 'default' True 'description' 'If set output will be streamed from the job to local standard output '} 'cluster_config' { 'allOf' [{'$ref' '#/definitions/KubernetesClusterConfig'}] 'title' 'Cluster Config' 'description' 'The Kubernetes cluster config to use for job creation ' } 'finished_job_ttl' { 'type' 'integer' 'title' 'Finished Job TTL' 'default' 180 'description' 'The number of seconds to retain jobs after completion If set finished jobs will be cleaned up by Kubernetes after the given delay If not set jobs will be retained indefinitely ' } 'k8s_resource_cpu' {'type' 'string' 'title' 'Resource CPU' 'default' '500m' 'description' 'The amount of CPU to request and limit for job containers '} 'image_pull_policy' { 'enum' ['IfNotPresent' 'Always' 'Never'] 'type' 'string' 'title' 'Image Pull Policy' 'default' 'IfNotPresent' 'description' 'The Kubernetes image pull policy to use for job containers ' } 'k8s_resource_memory' {'type' 'string' 'title' 'Resource Memory' 'default' '1Gi' 'description' 'The amount of memory to request and limit for job containers '} 'service_account_name' { 'type' 'string' 'title' 'Service Account Name' 'default' 'prefect-deployment-sa' 'description' 'The Kubernetes service account to use for job creation ' } 'job_watch_timeout_seconds' { 'type' 'integer' 'title' 'Job Watch Timeout Seconds' 'description' 'Number of seconds to wait for each event emitted by a job before timing out If not set the worker will wait for each event indefinitely ' } 'pod_watch_timeout_seconds' { 'type' 'integer' 'title' 'Pod Watch Timeout Seconds' 'default' 10800 'description' 'Number of seconds to watch for pod creation before
d
@Marvin i want to change PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS and PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS
m
thought for 33.4 seconds
You can modify these settings in a few ways: 1. Using Environment Variables
Copy code
bash
   export PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=1000
   export PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS=1000
2. In Kubernetes Deployment If you're running Prefect server in Kubernetes, add these to your deployment environment variables:
Copy code
yaml
   env:
     - name: PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS
       value: "1000"
     - name: PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS
       value: "1000"
3. Using a Prefect Profile Create or modify your profile in `~/.prefect/profiles.toml`:
Copy code
toml
   [profiles.default]
   PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=1000
   PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS=1000
Important Notes: 1. These settings affect how many runs are auto-scheduled in advance 2. You'll need to restart your Prefect server after changing these values 3. Make sure to set both variables to maintain consistency 4. Consider your database capacity when setting higher values To verify the changes have taken effect, you can: 1. Check your server logs for any configuration-related messages 2. Monitor the number of scheduled runs in the UI 3. Query the database to verify the number of scheduled runs Would you like me to help you with: 1. The specific configuration for your deployment method (Kubernetes, Docker, etc.)? 2. Verifying the changes have taken effect? 3. Understanding any related settings that might need adjustment? Please let me know which deployment method you're using and if you need any specific configuration examples.
d
@Marvin i added it on the helm chart of prefect server, the deployment of prefect server given this, but the pod of prefect server not have
m
thought for 48.3 seconds
Could you please share: 1. The relevant section of your
values.yaml
where you're trying to set these variables? 2. The command you used to install/upgrade the Helm chart? Here are a few ways to properly set these environment variables in your Helm deployment: 1. Using values.yaml:
Copy code
yaml
server:
  env:
    - name: PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS
      value: "1000"
    - name: PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS
      value: "1000"
2. Using Helm --set:
Copy code
bash
helm upgrade --install prefect prefecthq/prefect-server \
  --set server.env[0].name=PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS,server.env[0].value="1000" \
  --set server.env[1].name=PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS,server.env[1].value="1000"
3. Using a ConfigMap:
Copy code
yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prefect-server-config
data:
  PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS: "1000"
  PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS: "1000"
---
# In your values.yaml
server:
  envFrom:
    - configMapRef:
        name: prefect-server-config
To verify the configuration: 1. Check if the environment variables are present in the pod:
Copy code
bash
kubectl exec -it <prefect-server-pod-name> -- env | grep PREFECT
2. Check the server logs for any configuration-related messages:
Copy code
bash
kubectl logs <prefect-server-pod-name>
3. Verify the pod specification:
Copy code
bash
kubectl get pod <prefect-server-pod-name> -o yaml
Could you please: 1. Share your current Helm values configuration? 2. Confirm which method you're using to set these variables? 3. Share any error messages or unexpected behavior you're seeing? This will help me provide more specific guidance for your setup.
d
@Marvin what does this mean 2 env variable?
m
thought for 37.5 seconds
Let me break this down clearly: 1. PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS - Controls how many future flow runs can be scheduled through the API service - Default value: 100 - Example: If you have a flow scheduled to run hourly, this setting would limit how many future runs are created in advance 2. PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS - Controls how many future flow runs can be scheduled by the Prefect server - Default value: 100 - Serves the same purpose but for server-side scheduling Practical Example: Let's say you have a flow that runs every 5 minutes: - With the default setting (100): - The scheduler will create up to 100 future runs - This covers about 8.3 hours into the future (5 minutes × 100 runs) - If you set it to 1000: - The scheduler will create up to 1000 future runs - This covers about 83.3 hours (approximately 3.5 days) into the future Why These Settings Matter: 1. Resource Management - Higher values mean more scheduled runs stored in the database - More future runs visible in the UI - Greater advance planning capability 2. Database Impact - Higher values = more database entries - More storage space needed - Potentially slower queries 3. Visibility and Planning - Higher values give you more visibility into future scheduled runs - Helpful for long-term planning and monitoring When to Adjust These Values: 1. Increase them when: - You need to see more future scheduled runs - You have flows that run frequently and need longer advance scheduling - Your database can handle the additional records 2. Keep them lower when: - You want to minimize database load - You don't need to see far into the future - You're running in a resource-constrained environment Best Practices: 1. Keep both values the same to maintain consistency 2. Consider your database capacity when setting higher values 3. Balance between visibility needs and resource usage 4. Monitor database performance after making changes Would you like me to help you: 1. Determine the optimal values for your specific use case? 2. Set up monitoring to track the impact of these settings? 3. Configure these values in your specific deployment method?
d
@Marvin but my flow is event based, not have any scheduled, is it work with 2 params
m
thought for 54.4 seconds
For your specific case, here are the settings that actually matter for controlling concurrency of API-triggered flows: 1. Work Pool Concurrency
Copy code
bash
   prefect work-pool set-concurrency-limit POOL_NAME 100
2. Work Queue Concurrency (if using multiple queues)
Copy code
bash
   prefect work-queue set-concurrency-limit QUEUE_NAME 100
3. Worker Configuration In your Kubernetes worker deployment, ensure proper resources and limits:
Copy code
yaml
   resources:
     requests:
       memory: "1Gi"
       cpu: "500m"
     limits:
       memory: "2Gi"
       cpu: "1000m"
To troubleshoot your current situation where you're only seeing 4-50 concurrent runs instead of 100, please share: 1. Your work pool configuration:
Copy code
bash
   prefect work-pool inspect YOUR_POOL_NAME
2. Your worker Kubernetes deployment YAML 3. Are you seeing any specific errors in: - Worker logs - Prefect server logs - Kubernetes events (
kubectl get events
) The scheduler-related environment variables you asked about (
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS
and
PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS
) won't help with your concurrency issues since they only affect scheduled (not event-based) flows. Let's focus on the settings that will actually impact your concurrent execution capacity.