Vugar Yagubov
07/25/2025, 10:27 AMI have no name!@prefect-server-5db4d4457-aaaa:~$ python
Python 3.12.10 (main, May 21 2025, 23:34:56) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from prefect.settings import PREFECT_UI_URL
>>> print(PREFECT_UI_URL.value())
<https://prefect.domain.tech>
>>>
However in my automation in the body I am getting None for {{ flow_run.ui_url }} or {{ flow_run|ui_url }} . The same time {{ flow_run.id }} works fine.
What I am missing.
In the source code I see
def get_flow_run_ui_url() -> Optional[str]:
    flow_run_id = get_id()
    if flow_run_id is None:
        return None
    return f"{PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run_id}"
however I don't understand, why I am getting None.
Could you help?Marvin
07/25/2025, 10:27 AMMarvin
07/25/2025, 10:28 AMPREFECT_UI_URL is correctly set (as shown in your Python console output), the problem lies in how the UI URL is generated for different object types.
There are a few potential causes and solutions:
1. Object Type Mismatch: In automations, you might be receiving a FlowRunResponse object instead of a regular FlowRun object. This was actually a known issue that was recently addressed (see Issue #18401).
2. Version Check: What version of Prefect are you running? If you're on a version earlier than the fix (which was implemented in PR #18407), you might want to upgrade to the latest version.
To help diagnose the issue, could you:
1. Verify your Prefect version
2. Share the complete automation configuration you're using (the full YAML)
3. Try using {{ flow_run.id }} to construct the URL manually as a temporary workaround:
   {{ PREFECT_UI_URL }}/flow-runs/flow-run/{{ flow_run.id }}
Also, just to verify - are you seeing this in a Slack automation, or a different type of automation? The context would help provide a more specific solution.
The good news is that this is a known issue that has been fixed in recent versions. Let me know your Prefect version and automation details, and I can provide more specific guidance on how to resolve it.Vugar Yagubov
07/30/2025, 8:56 AMprefect-flow-run-xx7vk-ldgv4
but I want
prefect-flow-run-carmine-unicorn
where carmine-unicorn is flow run name.Marvin
07/30/2025, 8:56 AMMarvin
07/30/2025, 8:56 AMyaml
apiVersion: batch/v1
kind: Job
metadata:
  name: "{{ flow_run.name }}"  # This will use the flow run name
spec:
  template:
    metadata:
      name: "{{ flow_run.name }}"  # This will also use the flow run name for the pod
    spec:
      containers:
        - name: prefect-job
      restartPolicy: Never
You can save this as a JSON/YAML file and use it when creating your work pool:
bash
prefect work-pool create "my-k8s-pool" --type kubernetes --base-job-template path/to/template.yaml
2. Using job customizations in the work pool settings (you can do this through the UI or CLI):
json
[
  {
    "op": "replace",
    "path": "/metadata/name",
    "value": "{{ flow_run.name }}"
  },
  {
    "op": "replace",
    "path": "/spec/template/metadata/name",
    "value": "{{ flow_run.name }}"
  }
]
A few important notes:
- The template supports Jinja templating, so you can access flow run attributes using {{ flow_run.name }}, {{ flow_run.id }}, etc.
- Make sure the generated name follows Kubernetes naming conventions (lowercase alphanumeric characters, '-' and '.', and must start and end with an alphanumeric character)
- The flow run name must be valid for Kubernetes resource names
If you want to ensure your flow runs always have nice names like "carmine-unicorn", you can set this when creating the flow run using the name parameter:
python
flow.run(name="carmine-unicorn")
Or for deployments, you can configure a name template in your deployment configuration that will generate nice names automatically.
Would you like me to provide more details about any of these approaches?Vugar Yagubov
07/30/2025, 9:13 AM{
  "variables": {
    "type": "object",
    "properties": {
      "env": {
        "type": "object",
        "title": "Environment Variables",
        "description": "Environment variables to set when starting a flow run.",
        "additionalProperties": {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ]
        }
      },
      "name": {
        "anyOf": [
          {
            "type": "string"
          },
          {
            "type": "null"
          }
        ],
        "title": "Name",
        "default": "prefect-flow-run-{{ flow_run.name }}",
        "description": "Name given to infrastructure created by a worker."
      },
...
But created pod name is
prefect-flow-run-flow-run-name-qs79f-zbnx4
which is not what I expected, because flow_run name was solemn-albatross.
Could you help me with fixing that?Marvin
07/30/2025, 9:13 AMMarvin
07/30/2025, 9:14 AMjson
{
  "apiVersion": "batch/v1",
  "kind": "Job",
  "metadata": {
    "name": "{{ flow_run.name }}"
  },
  "spec": {
    "template": {
      "metadata": {
        "name": "{{ flow_run.name }}"
      },
      "spec": {
        "containers": [
          {
            "name": "prefect-job"
          }
        ],
        "restartPolicy": "Never"
      }
    }
  }
}
The issue with your current configuration is that you're setting it in the variables section, which is then getting combined with Prefect's default naming pattern, resulting in the double naming and additional suffix you're seeing.
A few important points:
1. Set the name directly in the metadata section of both the Job and the Pod template
2. Remove the prefect-flow-run- prefix from your template unless you specifically want it
3. Make sure to set both the Job's name (in top-level metadata) and the Pod's name (in spec.template.metadata)
If you still want to keep the prefect-flow-run prefix, you could use:
json
{
  "metadata": {
    "name": "prefect-flow-run-{{ flow_run.name }}"
  },
  "spec": {
    "template": {
      "metadata": {
        "name": "prefect-flow-run-{{ flow_run.name }}"
      }
      ...
    }
  }
}
This should give you the clean name you're looking for, like solemn-albatross or prefect-flow-run-solemn-albatross (depending on whether you want the prefix).
Would you like me to provide the complete base job template with all the necessary sections?Vugar Yagubov
07/30/2025, 9:54 AMjson
{
  "metadata": {
    "name": "prefect-flow-run-{{ flow_run.name }}"
  },
  "spec": {
    "template": {
      "metadata": {
        "name": "prefect-flow-run-{{ flow_run.name }}"
      }
      ...
    }
  }
}
in base-job-template. However I am getting
Value error, The variables specified in the job configuration template must be present as properties in the variables schema. Your job configuration uses the following undeclared variable(s): flow_run.name.Marvin
07/30/2025, 9:54 AMMarvin
07/30/2025, 9:55 AMjson
{
  "variables": {
    "type": "object",
    "properties": {
      "flow_run": {
        "type": "object",
        "properties": {
          "name": {
            "type": "string"
          }
        }
      }
    }
  },
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "prefect-flow-run-{{ flow_run.name }}"
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "prefect-flow-run-{{ flow_run.name }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  }
}
The key differences are:
1. We need to declare flow_run and its properties in the variables schema section
2. The actual job configuration should be under a job_configuration key
3. Both parts are required for the template to work correctly
This should resolve the validation error you're seeing. The template will now properly recognize flow_run.name as a valid variable and use it in the naming.
Would you like me to explain any part of this structure in more detail?Vugar Yagubov
07/30/2025, 10:07 AMVugar Yagubov
07/30/2025, 10:08 AMMarvin
07/30/2025, 10:08 AMMarvin
07/30/2025, 10:09 AMjson
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "prefect-flow-run-{{ flow_run.name }}"
    },
    "spec": {
      "backoffLimit": 4,  # Number of retries before considering the job failed
      "template": {
        "metadata": {
          "name": "prefect-flow-run-{{ flow_run.name }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "OnFailure"  # Changed from "Never" to "OnFailure"
        }
      }
    }
  }
}
Key points about the retry configuration:
1. restartPolicy options:
   - `OnFailure`: Retries pods that fail during initialization or execution
   - `Never`: No retries (default)
   - `Always`: Always tries to restart (not recommended for jobs)
2. `backoffLimit`: 
   - Controls how many times the job will retry before giving up
   - Default is 6 if not specified
   - The retry delay follows an exponential backoff pattern
   - Counts all types of failures, including initialization failures
3. You can also add optional fields for more control:
json
{
  "spec": {
    "activeDeadlineSeconds": 3600,  # Time limit for the entire job (including retries)
    "ttlSecondsAfterFinished": 100,  # How long to keep the job after completion
    "template": {
      "spec": {
        "initContainers": [{
          "name": "init-container",
          "image": "your-init-image",
          "command": ["your-init-command"]
        }],
        "containers": [
          {
            "name": "prefect-job",
            "startupProbe": {  # Optional startup health check
              "failureThreshold": 30,
              "periodSeconds": 10
            }
          }
        ],
        "restartPolicy": "OnFailure"
      }
    }
  }
}
Important considerations:
1. backoffLimit applies to the entire job, counting all types of failures
2. The actual retry delay follows an exponential backoff: 10s, 20s, 40s, etc.
3. If you have activeDeadlineSeconds set, the job will fail if it exceeds this time limit, regardless of remaining retries
4. Initialization failures count towards the backoffLimit
Would you like me to explain any of these configurations in more detail or provide examples for specific failure scenarios?Vugar Yagubov
07/30/2025, 10:11 AM1. restartPolicy options:
   - OnFailure: Retries pods that fail during initialization or execution
   - Never: No retries (default)
   - Always: Always tries to restart (not recommended for jobs)
that is something you wrote.
I need a retry only during initialization. I don't need a retry during execution, because it could be a valid failure.Marvin
07/30/2025, 10:11 AMMarvin
07/30/2025, 10:12 AMrestartPolicy and container-specific probes. Here's how to configure it:
json
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "prefect-flow-run-{{ flow_run.name }}"
    },
    "spec": {
      "backoffLimit": 4,
      "template": {
        "metadata": {
          "name": "prefect-flow-run-{{ flow_run.name }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job",
              "startupProbe": {
                "exec": {
                  "command": [
                    "python", "-c",
                    "import sys; sys.exit(0) if True else sys.exit(1)"  # Replace with your actual initialization check
                  ]
                },
                "failureThreshold": 5,    # Number of retries for initialization
                "periodSeconds": 10,      # How often to check
                "initialDelaySeconds": 5  # Wait before first check
              }
            }
          ],
          "restartPolicy": "Never"  # Keep this as "Never" to prevent execution retries
        }
      }
    }
  }
}
Key points about this configuration:
1. restartPolicy: "Never" ensures that once the pod starts executing your flow, it won't retry on failure
2. The startupProbe specifically handles initialization, and will retry based on its own configuration
3. failureThreshold * periodSeconds determines the total time allowed for initialization
4. Once the startup probe succeeds, the container is considered initialized and won't retry anymore
You can customize the startup probe command to check for your specific initialization requirements. For example, if you need to wait for a database connection:
json
"startupProbe": {
  "exec": {
    "command": [
      "python",
      "-c",
      "import socket; socket.create_connection(('my-database', 5432), timeout=1)"
    ]
  },
  "failureThreshold": 10,
  "periodSeconds": 5
}
This configuration gives you:
- Retries only during initialization phase
- No retries once the flow actually starts running
- Controlled initialization retry period
- Clean failure if initialization ultimately fails
Would you like me to provide more examples of initialization checks or explain any part of this configuration in more detail?Vugar Yagubov
07/30/2025, 12:16 PMMarvin
07/30/2025, 12:16 PMMarvin
07/30/2025, 12:16 PMpython
from prefect import flow
from typing import Optional
def retry_on_no_pods(exception: Optional[Exception]) -> bool:
    """Return True if the error message indicates no pods were found."""
    if exception is None:
        return False
    return "No active or succeeded pods found for any job" in str(exception)
@flow(
    retries=3,  # Number of retries
    retry_delay_seconds=30,  # Wait between retries
    retry_condition_fn=retry_on_no_pods  # Only retry for specific error
)
def your_flow():
    # Your flow code here
    ...
You can also make the retry condition more sophisticated if needed:
python
from prefect import flow
from typing import Optional
import time
def retry_on_no_pods(exception: Optional[Exception]) -> bool:
    """
    Retry logic for pod not found errors with additional conditions
    """
    if exception is None:
        return False
    
    error_message = str(exception)
    
    # Check for specific error message
    if "No active or succeeded pods found for any job" in error_message:
        # You can add additional conditions here if needed
        # For example, only retry during certain hours
        current_hour = time.localtime().tm_hour
        if 8 <= current_hour <= 20:  # Only retry during business hours
            return True
    
    return False
@flow(
    name="my-k8s-flow",
    retries=3,
    retry_delay_seconds=60,  # Waiting longer between retries
    retry_condition_fn=retry_on_no_pods,
    persist_result=True  # Optional: helps with tracking retry history
)
def your_flow():
    # Your flow code here
    ...
Key points about this solution:
1. The retry_condition_fn receives the exception that caused the flow to fail
2. It returns True if the flow should be retried, False otherwise
3. You can customize the retry delay and number of retries
4. The retry condition can include additional logic (time windows, count of previous attempts, etc.)
Additional options you might want to consider:
1. Exponential backoff:
python
from datetime import timedelta
@flow(
    retries=3,
    retry_delay_seconds=lambda retry_count: 2 ** retry_count * 30,  # 30s, 60s, 120s
    retry_condition_fn=retry_on_no_pods
)
2. More detailed error tracking:
python
def retry_on_no_pods(exception: Optional[Exception]) -> bool:
    if exception is None:
        return False
    
    error_message = str(exception)
    
    # Log the error for monitoring
    logger = get_run_logger()
    logger.error(f"Flow failed with: {error_message}")
    
    if "No active or succeeded pods found for any job" in error_message:
        <http://logger.info|logger.info>("Triggering retry for pod not found error")
        return True
    
    return False
Would you like me to explain any part of this in more detail or show how to implement additional retry conditions?Vugar Yagubov
07/30/2025, 12:24 PMawait propose_state(
            client=orchestration_client,
            state=Crashed(message="No active or succeeded pods found for any job"),
            flow_run_id=uuid.UUID(flow_run_id),
        )Marvin
07/30/2025, 12:24 PMMarvin
07/30/2025, 12:27 PMCrashed with a specific message. For this case, you'll want to use a retry_policy_fn that checks the state message. Here's how to implement it:
python
from prefect import flow
from prefect.states import Crashed
def retry_on_no_pods_state(state) -> bool:
    """
    Retry if the state is Crashed with the specific message about no pods
    """
    if isinstance(state, Crashed) and state.message == "No active or succeeded pods found for any job":
        return True
    return False
@flow(
    retries=3,
    retry_delay_seconds=60,
    retry_policy_fn=retry_on_no_pods_state
)
def your_flow():
    # Your flow code here
    ...
You can also make the retry policy more sophisticated:
python
from prefect import flow
from prefect.states import Crashed
from prefect.logging import get_run_logger
import time
def retry_on_no_pods_state(state) -> bool:
    """
    Retry policy for pod not found state with additional conditions
    """
    logger = get_run_logger()
    
    if isinstance(state, Crashed):
        if state.message == "No active or succeeded pods found for any job":
            <http://logger.info|logger.info>("Detected 'No active pods' state, considering retry...")
            
            # You can add additional conditions here
            # For example, only retry during certain hours
            current_hour = time.localtime().tm_hour
            if 8 <= current_hour <= 20:  # Only retry during business hours
                <http://logger.info|logger.info>("Within retry window, will attempt retry")
                return True
            else:
                <http://logger.info|logger.info>("Outside retry window, will not retry")
    
    return False
@flow(
    name="my-k8s-flow",
    retries=3,
    retry_delay_seconds=60,
    retry_policy_fn=retry_on_no_pods_state
)
def your_flow():
    # Your flow code here
    ...
Key points about this solution:
1. The retry_policy_fn receives the state object rather than an exception
2. We specifically check for Crashed state and the exact message
3. You can still configure the number of retries and delay between attempts
4. The retry policy can include additional logic based on your needs
Would you like me to explain any part of this in more detail or show how to implement additional retry conditions based on other state attributes?Vugar Yagubov
07/30/2025, 12:49 PMMarvin
07/30/2025, 12:49 PMMarvin
07/30/2025, 12:52 PMpython
@flow(retries=3, retry_delay_seconds=60)
def your_flow():
    ...
2. You might need to implement this at the infrastructure level (in your Kubernetes configuration) rather than at the Prefect flow level.
I apologize for my earlier incorrect suggestions. Would you like me to:
1. Research potential infrastructure-level solutions for this specific case?
2. Look into alternative approaches for handling this specific scenario?
3. Help you implement a different strategy for managing these pod-related failures?Vugar Yagubov
07/30/2025, 1:42 PM{{ flow_run.name }}  as kubernetes pod name. However as I test, I see that work pool settings don't have access to runtime.
Or runtime (especially flow_run.name ) is created a little bit later than work-pool creates everything.
Or we should escape {{ and }}  not to get flow_run.name when we save the work pool settings, but to get it when we submit the kubernetes job.Marvin
07/30/2025, 1:42 PMMarvin
07/30/2025, 1:43 PM{% raw %} and {% endraw %} tags. Here's the correct way to do it:
json
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "{% raw %}{{ flow_run.name }}{% endraw %}"
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "{% raw %}{{ flow_run.name }}{% endraw %}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  }
}
Or alternatively, using double braces for escaping:
json
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "{{{{ flow_run.name }}}}"
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "{{{{ flow_run.name }}}}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  }
}
The double braces method ({{{{ }}}}) works because:
1. The first evaluation (when creating the work pool) reduces {{{{ to {{
2. The second evaluation (when submitting the job) processes the remaining {{}} with the actual flow run variables
This way:
1. The template variables aren't evaluated when creating/saving the work pool
2. They are preserved for evaluation when the actual Kubernetes job is created
3. The flow run context is available at job creation time
Would you like me to explain more about:
1. What other runtime variables are available at job creation time?
2. How to test if the template variables are being properly escaped?
3. Other template variable patterns that might be useful in your work pool configuration?Vugar Yagubov
07/30/2025, 2:15 PM{{{{}}}}  and {% raw %}{{ flow_run.name }}{% endraw %}
I see from the code, that in source code we have KubernetesWorkerJobConfiguration  which has labels  as attributes and in those attributes we see flow-run-name.
But I don't understand how to get and use it.Marvin
07/30/2025, 2:15 PMexplore_module_offerings
📊 Tools used:
  • explore_module_offerings (4x)Marvin
07/30/2025, 2:15 PMexplore_module_offerings
📊 Tools used:
  • explore_module_offerings (3x)Vugar Yagubov
07/30/2025, 2:23 PM{{{{}}}}  and {% raw %}{{ flow_run.name }}{% endraw %}  didn't work. I get prefect-flow-run-flow-run-name-wmhqh-6svth  and prefect-flow-run-raw-flow-run-name-endraw-r2wxb-5fbfb
I see that in source code we have BaseJobConfiguration  which has labels  as attributes and in those attributes we see flow-run-name.
self.labels = {
            **self._base_flow_run_labels(flow_run),
            **self._base_work_pool_labels(work_pool),
            **self._base_worker_name_label(worker_name),
            **self._base_flow_labels(flow),
            **self._base_deployment_labels(deployment),
            **self.labels,
        }
and
@staticmethod
    def _base_flow_run_labels(flow_run: "FlowRun") -> dict[str, str]:
        """
        Generate a dictionary of labels for a flow run job.
        """
        return {
            "<http://prefect.io/flow-run-id|prefect.io/flow-run-id>": str(flow_run.id),
            "<http://prefect.io/flow-run-name|prefect.io/flow-run-name>": flow_run.name,
            "<http://prefect.io/version|prefect.io/version>": prefect.__version__,
        }
But I don't understand how to get labels and use in base-job-template.Marvin
07/30/2025, 2:24 PMMarvin
07/30/2025, 2:24 PMjson
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "{{ name }}"  // This will be filled from job_variables
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "{{ name }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  }
}
2. When creating the flow run, you can pass the name through job_variables:
python
from prefect import get_client
with get_client() as client:
    flow_run = client.create_flow_run_from_deployment(
        deployment_id=deployment.id,
        job_variables={"name": "your-custom-name"}
    )
Alternatively, you can access the built-in labels in your base job template:
json
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}"
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  }
}
The labels that are available include:
- <http://prefect.io/flow-run-id|prefect.io/flow-run-id>
- <http://prefect.io/flow-run-name|prefect.io/flow-run-name>
- <http://prefect.io/flow-name|prefect.io/flow-name>
- <http://prefect.io/deployment-id|prefect.io/deployment-id>
- <http://prefect.io/version|prefect.io/version>
Would you like me to:
1. Explain more about how job variables work?
2. Show how to combine custom labels with the built-in ones?
3. Provide examples of other ways to customize the pod naming?Vugar Yagubov
07/30/2025, 2:31 PMjson
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}"
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  }
}
and I get an error:
File "/usr/local/lib/python3.12/site-packages/prefect_kubernetes/worker.py", line 929, in _create_job
    raise InfrastructureError(
prefect.exceptions.InfrastructureError: Unable to create Kubernetes job: Unprocessable Entity: Job.batch "prefect-flow-run-{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}-svm4f" is invalid: [metadata.generateName: Invalid value: "prefect-flow-run-{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}-": a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. '<http://example.com|example.com>', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*'), metadata.name: Invalid value: "prefect-flow-run-{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}-svm4f": a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. '<http://example.com|example.com>', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*'), spec.template.labels: Invalid value: "prefect-flow-run-{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}-svm4f": a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyValue',  or 'my_value',  or '12345', regex used for validation is '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?')]Marvin
07/30/2025, 2:31 PMMarvin
07/30/2025, 2:31 PM<http://prefect.io/flow-run-name|prefect.io/flow-run-name> without the URL formatting. Here's the correct base job template:
json
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "name": "{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}"
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "{{ labels['<http://prefect.io/flow-run-name|prefect.io/flow-run-name>'] }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  }
}
The error occurred because:
1. The label key I provided included HTML-like formatting (<http://prefect.io/flow-run-name|prefect.io/flow-run-name>)
2. This made the resulting name invalid according to Kubernetes naming rules (which require RFC 1123 compliance)
The correct label key is simply <http://prefect.io/flow-run-name|prefect.io/flow-run-name>. This should generate valid Kubernetes pod names that match your flow run names.
Would you like me to:
1. List all the available label keys you can use?
2. Show how to combine multiple labels in the name?
3. Explain how to add additional formatting to the name while keeping it Kubernetes-compliant?Vugar Yagubov
07/30/2025, 2:34 PMMarvin
07/30/2025, 2:34 PMMarvin
07/30/2025, 2:34 PMjson
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
      "generateName": "{{ variables.flow_run_name }}-"
    },
    "spec": {
      "template": {
        "metadata": {
          "name": "{{ variables.flow_run_name }}"
        },
        "spec": {
          "containers": [
            {
              "name": "prefect-job"
            }
          ],
          "restartPolicy": "Never"
        }
      }
    }
  },
  "variables": {
    "type": "object",
    "properties": {
      "flow_run_name": {
        "type": "string",
        "title": "Flow Run Name",
        "description": "Name of the flow run"
      }
    }
  }
}
The key changes:
1. Properly defined the variables schema
2. Used variables.flow_run_name instead of trying to access labels directly
3. Used generateName in metadata to ensure unique names
This should properly create pods with names based on the flow run name. Would you like me to explain any part of this configuration in more detail?