<@ULVA73B9P> I have a kubernetes infra work pool n...
# marvin-ai
t
@Marvin I have a kubernetes infra work pool named default, I was using the following code to register a deployment
Copy code
from prefect_github.repository import GitHubRepository

github_repository_block = GitHubRepository.load("minerva-prefect-scripts")

hello_flow.deploy(
    name="hello-k8s",
    work_pool_name="default",  # your Kubernetes work pool
    version="1.0",
    entrypoint="example_flow:hello_flow",  # format: module:function
    parameters={"name": "Prefect"},
    storage=github_repository_block  # GitHub repo block            # run as Kubernetes job
)
But I am getting the error ->
Copy code
ValueError: Either an image or remote storage location must be provided when deploying a deployment.
Can you please help me figure out what is the correct method to register a deployment for a flow code that lives in a github repo
m
thought for 322.7 seconds
k
it'll be something closer to
Copy code
hello_flow.from_source(
   ...
).deploy(
   ...
)
where
from_source
is where you define the repository and
deploy
is where you define everything else
🙌 1
prefect rocket 1
m
Thanks for sharing the snippet — the error makes sense. In Prefect 3.x, GitHub “storage blocks” are removed and
Flow.deploy
requires either: - a remote source (i.e., the flow code can be fetched at run time), or - a Docker image that already contains your code. Your code is using a 2.x-style
GitHubRepository
block via
storage=...
, which 3.x no longer supports. That’s why you’re seeing:
ValueError: Either an image or remote storage location must be provided when deploying a deployment.
Here are the supported ways to deploy flows that live in GitHub. Option A: Deploy directly from your GitHub repo (no image) - Use
Flow.from_source(...)
and pass an entrypoint in the “file.py:flow_func” format (path is relative to the repo root). - Note: don’t use module notation like `example_flow:hello_flow`; it should be the file path, e.g.,
example_flow.py:hello_flow
. Example (public repo):
Copy code
from prefect.flows import Flow

remote_flow = Flow.from_source(
    "<https://github.com/<owner>/<repo>>",
    entrypoint="example_flow.py:hello_flow",  # file.py:flow_func
)

remote_flow.deploy(
    name="hello-k8s",
    work_pool_name="default",
    version="1.0",
    parameters={"name": "Prefect"},
)
Private repo tips: - The runner that executes your deployment must be able to clone your repo. Common patterns: - Provide a GitHub token via environment variables on your Kubernetes work pool/worker (e.g.,
GITHUB_TOKEN
) and use HTTPS with a token in the URL, or configure SSH deploy keys via your pod/job template. - If you prefer the CLI, the
prefect.yaml
“pull” step supports a
git_clone
step with an
access_token
(see CLI example below). - Docs: Store flow code for deployments (Prefect 3) Option B: Build and use a Docker image - Build an image that includes your code and dependencies, then pass
image=
to
deploy
. Example:
Copy code
from prefect.flows import Flow
from prefect.docker import DockerImage

# Build an image locally and push it (or set build=False if already pushed)
image = DockerImage(
    name="<http://ghcr.io/<owner>/<image>:1.0.0|ghcr.io/<owner>/<image>:1.0.0>",
)

flow = Flow.from_source(
    "<https://github.com/<owner>/<repo>>",
    entrypoint="example_flow.py:hello_flow",
)

flow.deploy(
    name="hello-k8s",
    work_pool_name="default",
    image=image,   # satisfies the "image or remote source" requirement
    version="1.0",
    parameters={"name": "Prefect"},
)
CLI alternative (recommended for private repos) - Prefect 3.x uses
prefect deploy
(not
prefect deployment build
). - In
prefect.yaml
, add a
pull
step to clone your repo. You can reference a token from an environment variable or Prefect secret. Example `prefect.yaml`:
Copy code
name: my-project
pull:
  - prefect.deployments.steps.set_working_directory:
      directory: null
  - prefect.deployments.steps.git_clone:
      repository: <https://github.com/<owner>/<repo>.git>
      branch: main
      access_token: ${GITHUB_TOKEN}   # make sure your worker has this env var
deployments:
  - name: hello-k8s
    entrypoint: example_flow.py:hello_flow
    work_pool:
      name: default
    parameters:
      name: Prefect
Then run:
Copy code
prefect deploy
Useful docs: - Store flow code for deployments: Prefect 3 docs: Store flow code - Work pools (Kubernetes): Manage work pools - CLI help verified:
prefect deploy --help
and
prefect work-pool create --help
A couple quick checks: - Is your repo public or private? - If private, how do you prefer to authenticate (HTTPS token vs SSH key)? I can tailor the exact config for your Kubernetes work pool/job template.
t
@Marvin I switched to local process instead of kubernetes and wrote the script
Copy code
from prefect.flows import Flow
from prefect_github.repository import GitHubRepository

github_repository_block = GitHubRepository.load("minerva-prefect-scripts")

remote_flow = Flow.from_source(
    source=github_repository_block,
    entrypoint="example_flow.py:hello_flow",  # file.py:flow_func
)

remote_flow.deploy(
    name="hello-k8s",
    work_pool_name="Test",
    version="1.0",
    parameters={"name": "Prefect"},
)
got the error:
Copy code
No block class found for slug 'github-repository'. Please ensure that the block class is available in the current environment.
m
thought for 88.3 seconds
You’re hitting that error because
GitHubRepository
is a 2.x-style block and it isn’t available/registered in Prefect 3.x. In 3.x, repository storage blocks are removed. Instead, you must either: - point the deployment at a remote source via
Flow.from_source(...)
, or - provide a Docker image containing your code. Since you switched to a local (process) work pool, here’s the minimal 3.x approach that deploys straight from your GitHub repo without blocks: Public GitHub repo
Copy code
from prefect.flows import Flow

remote_flow = Flow.from_source(
    "<https://github.com/<owner>/<repo>>",
    entrypoint="example_flow.py:hello_flow",  # file.py:flow_function
)

remote_flow.deploy(
    name="hello-local",
    work_pool_name="Test",
    version="1.0",
    parameters={"name": "Prefect"},
)
Notes: - The entrypoint must be a file path from the repo root, not a module path. For example,
example_flow.py:hello_flow
, not
example_flow:hello_flow
. - Make sure a worker is running for your process pool:
Copy code
prefect worker start --pool Test
Private GitHub repo For private repos, the worker needs credentials to clone. Easiest is to use the CLI with a
prefect.yaml
pull step and provide a token via env var: prefect.yaml
Copy code
name: my-project
pull:
  - prefect.deployments.steps.set_working_directory:
      directory: null
  - prefect.deployments.steps.git_clone:
      repository: <https://github.com/<owner>/<repo>.git>
      branch: main
      access_token: ${GITHUB_TOKEN}
deployments:
  - name: hello-local
    entrypoint: example_flow.py:hello_flow
    work_pool:
      name: Test
    parameters:
      name: Prefect
Then:
Copy code
export GITHUB_TOKEN=<your_token>  # make sure the worker inherits this
prefect deploy
prefect worker start --pool Test
Why your current code fails -
GitHubRepository.load(...)
refers to a 2.x block (slug
github-repository
). In 3.x, that block doesn’t exist, hence “No block class found for slug ‘github-repository’.” - Use
Flow.from_source("<https://github.com/...>")
instead of
storage=...
. Docs for reference: - Storing flow code for deployments: Store flow code (Prefect 3) - Managing work pools: Manage work pools If your repo is private, do you prefer HTTPS with a token or SSH deploy keys? I can share the exact configuration for your worker environment and job template.
t
@Marvin this is my flow code
Copy code
from prefect import flow

@flow
def hello_flow(name: str = "world"):
    print(f"👋 Hello, {name}!")
    return f"👋 Hello, {name}!"
Where can I see this output on the UI ?
m
thought for 69.5 seconds
Short answer: in the Prefect UI, open your flow run and check the Logs tab. A few important details to actually see your “print” output there: - Print statements are not sent to the UI by default. Either: - enable print capture with
@flow(log_prints=True)
, or - use the Prefect logger (
get_run_logger().info(...)
), which always goes to logs. - Make sure you’re running the flow as a Prefect flow run (e.g., via a deployment with a running worker). If you just run the script locally outside a Prefect run, logs won’t appear in the UI. - Ensure your worker is configured to send logs to the API (generally on by default). If needed:
Copy code
export PREFECT_LOGGING_TO_API=true
  prefect worker start --pool Test
Updated flow example:
Copy code
from prefect import flow, get_run_logger

@flow(log_prints=True)
def hello_flow(name: str = "world"):
    # This will appear in the UI Logs when log_prints=True
    print(f"👋 Hello, {name}!")

    # This will appear in the UI Logs regardless of log_prints
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"👋 Hello, {name}!")

    return f"👋 Hello, {name}!"
UI navigation: - Go to your Deployment → open a specific Flow Run → Logs tab. - Or go to Flow Runs → select your run → Logs tab. If you want a visual record besides logs, you can also create an artifact:
Copy code
from prefect.artifacts import create_markdown_artifact

create_markdown_artifact(markdown=f"Result: 👋 Hello, {name}!")
Then view it on the run’s Artifacts tab.
t
@Marvin I got this error from kubernetes work pool and worker deployment run
Copy code
kubernetes_asyncio.config.kube_config - [Errno 2] No such file or directory: 'kubelogin'
m
thought for 137.0 seconds
Thanks for sharing the error. That message means your worker is trying to authenticate to the cluster using the Azure AAD exec plugin, but the
kubelogin
binary is not present in the worker’s environment. Why this happens - Your kubeconfig likely has a user that uses
exec
with
kubelogin
(common for AKS AAD auth). - The Prefect Kubernetes worker uses that kubeconfig to talk to the API server; without the
kubelogin
binary in PATH, you get
[Errno 2] No such file or directory: 'kubelogin'
. Ways to fix it Option A — Install kubelogin in the worker image (recommended if using kubeconfig/AAD) - Add kubelogin to the Docker image your worker runs with. Example Dockerfile snippet (multi-arch):
Copy code
FROM prefecthq/prefect:3-python3.11

ARG DEBIAN_FRONTEND=noninteractive
ARG KUBELOGIN_VERSION=v0.0.35
ARG TARGETARCH

RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl unzip \
 && curl -L -o /tmp/kubelogin.zip <https://github.com/Azure/kubelogin/releases/download/${KUBELOGIN_VERSION}/kubelogin-linux-${TARGETARCH}.zip> \
 && unzip /tmp/kubelogin.zip -d /usr/local/bin \
 && chmod +x /usr/local/bin/kubelogin \
 && rm -rf /var/lib/apt/lists/* /tmp/*
- Use this image for your Kubernetes work pool jobs (set the image in your work pool’s base job template or when you deploy your flow). Option B — Use in-cluster auth (no kubelogin needed) - If your worker runs inside the same cluster, you can avoid kubeconfig/AAD entirely and use the pod’s service account. - Ensure you are NOT mounting/pointing
KUBECONFIG
in the worker pod. - Give the worker’s service account RBAC to create Jobs/Pods in your target namespace. Minimal example:
Copy code
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-worker
  namespace: <ns>
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
  name: prefect-worker
  namespace: <ns>
rules:
- apiGroups: ["", "batch"]
  resources: ["pods", "pods/log", "pods/exec", "jobs"]
  verbs: ["create", "get", "list", "watch", "delete"]
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
  name: prefect-worker
  namespace: <ns>
roleRef:
  apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
  kind: Role
  name: prefect-worker
subjects:
- kind: ServiceAccount
  name: prefect-worker
  namespace: <ns>
Option C — If running the worker outside Kubernetes (on a VM/laptop) - Install kubelogin on that machine so it’s available in PATH for the worker process: - macOS:
brew install Azure/kubelogin/kubelogin
- Windows:
choco install kubelogin
- Linux: download the release zip for your arch and put
kubelogin
in
/usr/local/bin
- Kubelogin releases: Azure kubelogin releases Quick checks - Confirm your kubeconfig is using kubelogin: -
kubectl config view --minify -o jsonpath='{.users[0].user.exec.command}'
→ should show
kubelogin
- Inside the worker environment: -
which kubelogin
and
kubelogin --version
- Ensure
KUBECONFIG
points to a file the worker can read (or is unset if using in-cluster auth) Docs - Prefect 3.x Kubernetes work pools and workers: Prefect 3.x: Kubernetes deployments and workers A couple clarifying questions to make sure we point you to the best fix: - Is your worker running inside the AKS cluster or outside it? - Are you mounting/setting a kubeconfig for the worker that uses AAD exec with kubelogin? - Which image is your worker using right now? If you share it, I can suggest the smallest change to add kubelogin.
t
@Marvin I am using prefect.yaml to build image and deploy my flow, here is the code used to deploy ->
Copy code
name: axia
prefect-version: 3.4.21

build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "{{ $PREFECT_IMAGE_NAME }}"
    tag: latest-1
    dockerfile: auto
    platform: "linux/amd64"

push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"
    tag: "{{ build-image.tag }}"

deployments
definitions:
  work_pool:
    name: "default"
    job_variables:
      image: "{{ build-image.image }}"

deployments:
- name: "default"
  schedule: null
  entrypoint: "flows/example_flow.py:hello_flow"
The image is working fine on local but on k8s, I am getting the following error ->
Copy code
[Errno 2] No such file or directory: '/opt/prefect/axia'
08:46:07 PM
prefect.flow_runs
Unexpected exception encountered when trying to load flow
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 166, in run_steps
    step_output = await run_step(step, upstream_outputs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 137, in run_step
    result = await from_async.call_soon_in_new_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 374, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 399, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
m
thought for 405.9 seconds
Thanks for the details. The key part of your error is this path:
[Errno 2] No such file or directory: '/opt/prefect/axia'
What that means - In Prefect 3.x, when you deploy a project (using prefect.yaml), the runtime expects your project to be available inside the container at
/opt/prefect/<project_name>
. Here,
<project_name>
is the
name
in your prefect.yaml, so
/opt/prefect/axia
. - With
dockerfile: auto
, Prefect generates a Dockerfile that copies your project into the image so that path exists at runtime. Why it works locally but fails on Kubernetes The most common reasons the project directory is missing in K8s are: 1) The K8s pod is running an old image due to imagePullPolicy/caching - You’re using a static tag
latest-1
. K8s will reuse a cached image if
imagePullPolicy
is
IfNotPresent
(the default in many clusters), so the pod may not have your latest build that contains
/opt/prefect/axia
. - Fix: use unique tags per build and force pulls in your work pool/job variables. 2) A volume mount is shadowing
/opt/prefect
- If your K8s job template mounts a volume at
/opt/prefect
(or at
/opt/prefect/axia
), it will hide the project files that were baked into the image. - Fix: do not mount at
/opt/prefect
. If you need a workspace, mount elsewhere (e.g.,
/opt/prefect/data
). 3) Build context or .dockerignore excluded files - If your
.dockerignore
excludes
flows/
(or the entire project), the generated image will not contain your code even though it “built fine.” - Fix: check
.dockerignore
and confirm your
flows/example_flow.py
is copied into the image. What to change/check - Use a unique tag for each build and force K8s to pull: - Set
tag
to something unique (timestamp, commit SHA). - Set
image_pull_policy: Always
in your work pool job variables. - Optionally, persist the generated Dockerfile so you can see exactly what’s being copied: - Add
persist_dockerfile: true
and inspect
Dockerfile.generated
. Here’s a tightened-up prefect.yaml you can adapt:
Copy code
name: axia
prefect-version: 3.4.21

build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "{{ $PREFECT_IMAGE_NAME }}"
    tag: "{{ $.timestamp }}"    # unique tag per build
    dockerfile: auto
    persist_dockerfile: true     # optional: inspect generated Dockerfile
    platform: "linux/amd64"

push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"
    tag: "{{ build-image.tag }}"

definitions:
  work_pool:
    name: "default"
    job_variables:
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"          # ensure fresh pull in K8s

deployments:
- name: "default"
  schedule: null
  entrypoint: "flows/example_flow.py:hello_flow"
  work_pool: "{{ definitions.work_pool }}"
Quick diagnostics you can run: - In the running pod, check the path: -
kubectl exec -it <pod> -- ls -la /opt/prefect
-
kubectl exec -it <pod> -- ls -la /opt/prefect/axia
If it’s missing, it’s almost always due to image caching or a volume mount shadowing the directory. - Verify what image and pull policy the worker is using (work pool job template). Make sure it matches your freshly pushed tag and has
image_pull_policy: Always
. - Check
.dockerignore
for anything that might exclude
flows/
or your project files. If you can share: - The work pool type (Kubernetes?) and its job variables/template (especially volumes and image_pull_policy) - Whether you have any volume mounts at
/opt/prefect
- The output of
ls /opt/prefect
inside the pod …I can pinpoint the exact cause. References: - prefect.yaml reference - Kubernetes work pools and job variables
t
@Marvin
Copy code
FROM prefecthq/prefect:3.4.21-python3.12
COPY . /opt/prefect/axia/
WORKDIR /opt/prefect/axia/
This is the generated dockerfile and following is deployment inspect
Copy code
tmahajan@Tushars-MacBook-Pro-9 axia % prefect deployment inspect hello-flow/default
{
    'id': '86662f35-5d44-4f8c-9077-88f24af1eb95',
    'created': '2025-10-04T15:09:10.393567Z',
    'updated': '2025-10-04T15:47:10.127434Z',
    'name': 'default',
    'version': '9148bb5a',
    'version_id': None,
    'version_info': None,
    'branch': None,
    'base': None,
    'root': None,
    'description': None,
    'flow_id': 'ed0a0a50-dfba-4c52-aadb-cb3a87f64748',
    'concurrency_limit': None,
    'global_concurrency_limit': None,
    'concurrency_options': None,
    'paused': False,
    'schedules': [],
    'job_variables': {},
    'parameters': {},
    'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/opt/prefect/axia'}}],
    'tags': [],
    'labels': {'prefect.flow.id': 'ed0a0a50-dfba-4c52-aadb-cb3a87f64748'},
    'work_queue_name': 'default',
    'last_polled': '2025-10-04T15:47:10.123610Z',
    'parameter_openapi_schema': {
        'type': 'object',
        'title': 'Parameters',
        'properties': {'name': {'type': 'string', 'title': 'name', 'default': 'world', 'position': 0}}
    },
    'path': None,
    'entrypoint': 'flows/example_flow.py:hello_flow',
    'storage_document_id': None,
    'infrastructure_document_id': None,
    'created_by': None,
    'updated_by': None,
    'work_queue_id': None,
    'enforce_parameter_schema': True,
    'work_pool_name': 'default',
    'status': 'READY',
    'automations': []
}
m
thought for 100.1 seconds
Thanks — this is super helpful. What we know from your artifacts: - Your generated Dockerfile copies the project to
/opt/prefect/axia
and sets the workdir there:
Copy code
FROM prefecthq/prefect:3.4.21-python3.12
COPY . /opt/prefect/axia/
WORKDIR /opt/prefect/axia/
-
prefect deployment inspect
shows Prefect will set the working directory to
/opt/prefect/axia
at runtime:
Copy code
'pull_steps': [
  {'prefect.deployments.steps.set_working_directory': {'directory': '/opt/prefect/axia'}}
]
Given that, the K8s error
Copy code
[Errno 2] No such file or directory: '/opt/prefect/axia'
almost certainly means the container that ran in Kubernetes did not actually have your project files at that path. The two most common causes: 1) The K8s job is not using your built image - Your deployment’s
job_variables
are empty (
'job_variables': {}
). Unless your work pool’s base job template has been updated to use your built image, the K8s job likely ran the default base image
prefecthq/prefect:3.4.21-python3.12
— which does not include
/opt/prefect/axia
. - Action: - Inspect the work pool to see what image it’s using:
Copy code
prefect work-pool inspect default
- Also confirm the pod’s image:
Copy code
kubectl describe pod <podname> | grep -i image
- If it’s not your built image, set the image in your deployment or work pool. 2) A volume mount is shadowing
/opt/prefect
(less common, but worth checking) - If your K8s job template mounts a volume at
/opt/prefect
or
/opt/prefect/axia
, it will hide the files baked into the image. - Action: - Check your pool’s base job template for volume mounts at
/opt/prefect
. - If needed, mount volumes elsewhere (e.g.,
/opt/prefect/data
). How to fix in your prefect.yaml Make sure your deployment attaches the image to the work pool job variables and force pulls in K8s. For example:
Copy code
name: axia
prefect-version: 3.4.21

build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "{{ $PREFECT_IMAGE_NAME }}"
    tag: "{{ $.timestamp }}"     # unique tag per build to avoid cache
    dockerfile: auto
    persist_dockerfile: true      # optional, for visibility
    platform: "linux/amd64"

push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"
    tag: "{{ build-image.tag }}"

deployments:
- name: "default"
  entrypoint: "flows/example_flow.py:hello_flow"
  work_pool:
    name: "default"
    job_variables:
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"
Quick diagnostics - Inside a running pod:
Copy code
kubectl exec -it <pod> -- ls -la /opt/prefect
  kubectl exec -it <pod> -- ls -la /opt/prefect/axia
If the directory is missing, you’re not running the correct image or it’s being shadowed. - Verify you’re actually pulling the latest image (unique tag +
image_pull_policy: Always
). - Double-check
.dockerignore
isn’t excluding your
flows/
or other project files. If you can share: - Output of
prefect work-pool inspect default
(especially the image and any volume mounts) - The image printed by
kubectl describe pod
for the flow-run pod …I can pinpoint which of the two causes it is. Docs: - prefect.yaml reference - Kubernetes work pools and job variables
t
@Marvin I used this to add image pull secrets
Copy code
deployments:
- name: "default"
  schedule: null
  entrypoint: "flows/example_flow.py:hello_flow"
  work_pool:
    name: "default"
    job_variables:
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"
      imagePullSecrets:
        - dxminerva_cloud_artifactory
but got error
Copy code
message: 'initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t15-55-04-523025-00-00>:
          unable to retrieve auth token: invalid username/password: unknown: Authentication
m
thought for 172.5 seconds
Thanks for sharing the details — this helps pinpoint it. What’s happening - The error is coming from Kubernetes failing to pull your private image:
Copy code
unable to retrieve auth token: invalid username/password: unknown: Authentication
- In your deployment, you set
imagePullSecrets
in
job_variables
, but the Kubernetes worker expects snake_case and a specific structure. With the current key/format, the pod likely isn’t getting any imagePullSecrets, so the pull fails. How to fix 1) Use snake_case and the correct structure for image pull secrets: - Prefect’s Kubernetes worker maps
image_pull_secrets
to
pod.spec.imagePullSecrets
. It must be a list of objects with a
name
field. - Update your deployment block like this:
Copy code
deployments:
- name: "default"
  schedule: null
  entrypoint: "flows/example_flow.py:hello_flow"
  work_pool:
    name: "default"
    job_variables:
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"
      image_pull_secrets:
        - name: dxminerva-cloud-artifactory
Notes: - Use
image_pull_secrets
(snake_case), not
imagePullSecrets
. - The secret name must be valid K8s DNS-1123 (no underscores). If your actual secret uses underscores, recreate it with hyphens. 2) Ensure the secret exists in the same namespace as the flow-run pods and is the right type - Create (or recreate) your secret as a docker registry secret:
Copy code
kubectl create secret docker-registry dxminerva-cloud-artifactory \
  --docker-server=<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com> \
  --docker-username=<username> \
  --docker-password=<password-or-api-key> \
  --docker-email=<email> \
  -n <namespace>
- Make sure the
--docker-server
exactly matches the registry in your image URL. - For Artifactory, confirm the correct username/API key format (some setups use a robot account or API key). Alternative: bind the secret to a ServiceAccount and reference it - Create a ServiceAccount with imagePullSecrets:
Copy code
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-worker
imagePullSecrets:
- name: dxminerva-cloud-artifactory
- Then set in job_variables:
Copy code
service_account_name: prefect-worker
Verify it’s applied - Check the effective work pool/job variables:
Copy code
prefect work-pool inspect default
- Check the running pod has the secret:
Copy code
kubectl get pod <pod> -o yaml | grep -A3 imagePullSecrets
- Confirm the image used by the pod and that the pull policy is Always:
Copy code
kubectl describe pod <pod> | grep -i image
Also keep the unique image tag + Always pull - Keep unique tags (e.g., timestamp) and
image_pull_policy: "Always"
to avoid stale cached images. Docs - Kubernetes work pools and job variables: Kubernetes work pools guide - prefect.yaml reference: prefect.yaml reference If you share: - The namespace you’re running in - The output of
prefect work-pool inspect default
-
kubectl get secret dxminerva-cloud-artifactory -o yaml -n <namespace>
(redact data) …I can confirm the mapping and whether K8s is seeing the secret correctly.
t
@Marvin Secret ->
Copy code
apiVersion: v1
data:
  .dockerconfigjson: ****
kind: Secret
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"v1","data":{".dockerconfigjson":"***"},"kind":"Secret","metadata":{"annotations":{},"name":"dxminerva-cloud-artifactory","namespace":"ns-team-dx-minerva-prod-va7"},"type":"kubernetes.io/dockerconfigjson"}
  creationTimestamp: "2025-10-02T10:28:02Z"
  name: dxminerva-cloud-artifactory
  namespace: ns-team-dx-minerva-prod-va7
  resourceVersion: "5340990357"
  uid: a2523252-a63d-41b2-8c52-7aa93fc9529f
type: kubernetes.io/dockerconfigjson
Prefect.yaml
Copy code
name: axia
prefect-version: 3.4.21

build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    requires: prefect-docker>=0.4.0
    image_name: "{{ $PREFECT_IMAGE_NAME }}"
    tag: "{{ $.timestamp }}"    # unique tag per build
    dockerfile: auto
    persist_dockerfile: true     # optional: inspect generated Dockerfile
    platform: "linux/amd64"

push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.4.0
    image_name: "{{ build-image.image_name }}"
    tag: "{{ build-image.tag }}"

definitions:
  work_pool:
    name: "default"
    job_variables:
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"          # ensure fresh pull in K8s

deployments:
- name: "default"
  schedule: null
  entrypoint: "flows/example_flow.py:hello_flow"
  work_pool:
    name: "default"
    job_variables:
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"
      image_pull_secrets:
        - name: dxminerva-cloud-artifactory
Error ->
Copy code
state:
      waiting:
        message: 'Back-off pulling image "docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00":
          ErrImagePull: initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>:
          unable to retrieve auth token: invalid username/password: unknown: Authentication
m
thought for 67.0 seconds
Thanks for the extra context — your secret looks like the right type (`kubernetes.io/dockerconfigjson`) and your deployment is using the correct snake_case key
image_pull_secrets
. Since you’re still getting “invalid username/password”, here are the most common causes and how to pinpoint the exact issue. What to verify first 1) The flow-run pod is in the same namespace as the secret - Your secret is in
ns-team-dx-minerva-prod-va7
. If your flow-run pod is in a different namespace, it cannot use that secret. - Ensure your deployment sets the namespace:
Copy code
work_pool:
    name: "default"
    job_variables:
      namespace: ns-team-dx-minerva-prod-va7
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"
      image_pull_secrets:
        - name: dxminerva-cloud-artifactory
- Confirm the namespace of the flow-run pod:
Copy code
kubectl get pods -A | grep <flow-run-id-or-deployment-name>
2) The pod actually has imagePullSecrets attached - Inspect the pod YAML:
Copy code
kubectl get pod <pod-name> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
- You should see:
Copy code
imagePullSecrets:
  - name: dxminerva-cloud-artifactory
If it’s missing, the worker isn’t applying your job_variables. In that case: - Confirm your work pool type is Kubernetes:
Copy code
prefect work-pool inspect default
Look for something like
"type": "kubernetes"
. If it isn’t, your deployment won’t create K8s Jobs. - If it is Kubernetes but still not applying, you can bake
imagePullSecrets
into the work pool’s base job template as a fallback. 3) The dockerconfigjson matches the image registry host exactly - Decode and inspect the secret:
Copy code
kubectl get secret dxminerva-cloud-artifactory -n ns-team-dx-minerva-prod-va7 \
    -o jsonpath='{.data.\.dockerconfigjson}' | base64 -d
- In the output, under
auths
, the key must match the host of your image exactly: - Your image:
<http://docker-minerva-release.dr-uw2.adobeitc.com/hello|docker-minerva-release.dr-uw2.adobeitc.com/hello>:...
- Required auths key:
"<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com>"
(no scheme, no path) - If it doesn’t match, recreate the secret with the exact server:
Copy code
kubectl delete secret dxminerva-cloud-artifactory -n ns-team-dx-minerva-prod-va7

  kubectl create secret docker-registry dxminerva-cloud-artifactory \
    --docker-server=<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com> \
    --docker-username='<username>' \
    --docker-password='<password-or-api-key>' \
    --docker-email='<email>' \
    -n ns-team-dx-minerva-prod-va7
Notes for Artifactory: - If using a robot account, the username may need to be
robot$<name>
. - API keys/personal access tokens are commonly used as the password. 4) Credentials are actually valid for that repo - Try a simple test pull outside Prefect using the same secret: - Create a test Pod in the same namespace that uses the same image and `imagePullSecrets`:
Copy code
apiVersion: v1
    kind: Pod
    metadata:
      name: pull-test
      namespace: ns-team-dx-minerva-prod-va7
    spec:
      imagePullSecrets:
      - name: dxminerva-cloud-artifactory
      containers:
      - name: test
        image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
        command: ["sleep","3600"]
      restartPolicy: Never
Then:
Copy code
kubectl apply -f pull-test.yaml
    kubectl describe pod pull-test -n ns-team-dx-minerva-prod-va7 | grep -i image
    kubectl get pod pull-test -n ns-team-dx-minerva-prod-va7
- If this test pod also fails with invalid username/password, the credentials/registry mapping are the issue (not Prefect). Recreate the secret as above and/or verify the account can pull that repository. 5) Ensure you’re actually using a Kubernetes work pool and the right image - Verify:
Copy code
prefect work-pool inspect default
- Confirm
"type": "kubernetes"
. - Confirm the pod’s image: ```
kubectl describe pod <pod-name> -n ns-team-dx-minerva-prod-va7 | grep -i image
Copy code
It should be the unique tag you built and pushed. Keep using unique tags and `image_pull_policy: "Always"`.

If you prefer attaching the secret via ServiceAccount
- Create a ServiceAccount with the secret:
apiVersion: v1 kind: ServiceAccount metadata: name: prefect-worker namespace: ns-team-dx-minerva-prod-va7 imagePullSecrets: - name: dxminerva-cloud-artifactory
Copy code
- Then set in job_variables:
service_account_name: prefect-worker namespace: ns-team-dx-minerva-prod-va7 ``` If it still fails, please share: -
prefect work-pool inspect default
-
kubectl get pod <pod> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
- The decoded
.dockerconfigjson
“auths” keys (redact credentials) That will let me pinpoint whether it’s a namespace mismatch, missing imagePullSecrets on the pod, or a registry credential/server mismatch. References: - Kubernetes work pools and job variables - prefect.yaml reference
t
Copy code
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx  --context ethos44-prod-va7  -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets 
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
No image pull secrets inspect output
Copy code
tmahajan@Tushars-MacBook-Pro-9 axia %   prefect work-pool inspect default

WorkPool(
    id='5c7f6f71-6f86-4f85-a15a-e1546bc7bf87',
    created=DateTime(2025, 10, 2, 17, 31, 19, 185796, tzinfo=Timezone('UTC')),
    updated=DateTime(2025, 10, 4, 17, 16, 27, 43205, tzinfo=Timezone('UTC')),
    name='default',
    type='kubernetes',
    base_job_template={
        'variables': {
            'type': 'object',
            'properties': {
                'env': {
                    'type': 'object',
                    'title': 'Environment Variables',
                    'description': 'Environment variables to set when starting a flow run.',
                    'additionalProperties': {'anyOf': [{'type': 'string'}, {'type': 'null'}]}
                },
                'name': {
                    'anyOf': [{'type': 'string'}, {'type': 'null'}],
                    'title': 'Name',
                    'description': 'Name given to infrastructure created by a worker.'
                },
                'image': {
                    'anyOf': [{'type': 'string'}, {'type': 'null'}],
                    'title': 'Image',
                    'examples': ['<http://docker.io/prefecthq/prefect:3-latest|docker.io/prefecthq/prefect:3-latest>'],
                    'description': 'The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be 
used.'
                },
                'labels': {
                    'type': 'object',
                    'title': 'Labels',
                    'description': 'Labels applied to infrastructure created by a worker.',
                    'additionalProperties': {'type': 'string'}
                },
                'command': {
                    'anyOf': [{'type': 'string'}, {'type': 'null'}],
                    'title': 'Command',
                    'description': 'The command to use when starting a flow run. In most cases, this should be left blank and the command will be 
automatically generated by the worker.'
                },
                'namespace': {
                    'type': 'string',
                    'title': 'Namespace',
                    'default': 'ns-team-dx-minerva-prod-va7',
                    'description': 'The Kubernetes namespace to create jobs within.'
                },
                'backoff_limit': {
                    'type': 'integer',
                    'title': 'Backoff Limit',
                    'default': 0,
                    'minimum': 0,
                    'description': 'The number of times Kubernetes will retry a job after pod eviction. If set to 0, Prefect will reschedule the flow 
run when the pod is evicted.'
                },
                'stream_output': {
                    'type': 'boolean',
                    'title': 'Stream Output',
                    'default': True,
                    'description': 'If set, output will be streamed from the job to local standard output.'
                },
                'cluster_config': {
                    'anyOf': [{'$ref': '#/definitions/KubernetesClusterConfig'}, {'type': 'null'}],
                    'description': 'The Kubernetes cluster config to use for job creation.'
                },
                'finished_job_ttl': {
                    'anyOf': [{'type': 'integer'}, {'type': 'null'}],
                    'title': 'Finished Job TTL',
                    'default': 60,
                    'description': 'The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after
the given delay. If not set, jobs will be retained indefinitely.'
                },
                'image_pull_policy': {
                    'enum': ['IfNotPresent', 'Always', 'Never'],
                    'type': 'string',
                    'title': 'Image Pull Policy',
                    'default': 'IfNotPresent',
                    'description': 'The Kubernetes image pull policy to use for job containers.'
                },
                'service_account_name': {
                    'anyOf': [{'type': 'string'}, {'type': 'null'}],
                    'title': 'Service Account Name',
                    'description': 'The Kubernetes service account to use for job creation.'
                },
                'job_watch_timeout_seconds': {
                    'anyOf': [{'type': 'integer'}, {'type': 'null'}],
                    'title': 'Job Watch Timeout Seconds',
                    'description': 'Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for 
each event indefinitely.'
                },
                'pod_watch_timeout_seconds': {
                    'type': 'integer',
                    'title': 'Pod Watch Timeout Seconds',
                    'default': 60,
                    'description': 'Number of seconds to watch for pod creation before timing out.'
                }
            },
            'definitions': {
                'KubernetesClusterConfig': {
                    'type': 'object',
                    'title': 'KubernetesClusterConfig',
                    'required': ['config', 'context_name'],
                    'properties': {
                        'config': {
                            'type': 'object',
                            'title': 'Config',
                            'description': 'The entire contents of a kubectl config file.',
                            'additionalProperties': True
                        },
                        'context_name': {'type': 'string', 'title': 'Context Name', 'description': 'The name of the kubectl context to use.'}
                    },
                    'description': 'Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.',
                    'secret_fields': [],
                    'block_type_slug': 'kubernetes-cluster-config',
                    'block_schema_references': {}
                }
            },
            'description': 'Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of 
the default\nbase job template.'
        },
        'job_configuration': {
            'env': '{{ env }}',
            'name': '{{ name }}',
            'labels': '{{ labels }}',
            'command': '{{ command }}',
            'namespace': '{{ namespace }}',
            'job_manifest': {
                'kind': 'Job',
                'spec': {
                    'template': {
                        'spec': {
                            'containers': [
                                {
                                    'env': '{{ env }}',
                                    'args': '{{ command }}',
                                    'name': 'prefect-job',
                                    'image': '{{ image }}',
                                    'imagePullPolicy': '{{ image_pull_policy }}'
                                }
                            ],
                            'completions': 1,
                            'parallelism': 1,
                            'restartPolicy': 'Never',
                            'serviceAccountName': '{{ service_account_name }}'
                        }
                    },
                    'backoffLimit': '{{ backoff_limit }}',
                    'ttlSecondsAfterFinished': '{{ finished_job_ttl }}'
                },
                'metadata': {'labels': '{{ labels }}', 'namespace': '{{ namespace }}', 'generateName': '{{ name }}-'},
                'apiVersion': 'batch/v1'
            },
            'stream_output': '{{ stream_output }}',
            'cluster_config': '{{ cluster_config }}',
            'job_watch_timeout_seconds': '{{ job_watch_timeout_seconds }}',
            'pod_watch_timeout_seconds': '{{ pod_watch_timeout_seconds }}'
        }
    },
    status=WorkPoolStatus.READY,
    storage_configuration=WorkPoolStorageConfiguration(),
    default_queue_id='6007965a-e625-42fd-8284-c311e607c5cb'
)
Image grep:
Copy code
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx  --context ethos44-prod-va7  -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 image     
    image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
    imagePullPolicy: Always
    name: prefect-job
    resources:
      limits:
--
  - image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
    imageID: ""
    lastState: {}
    name: prefect-job
    ready: false
--
        message: 'Back-off pulling image "<http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>":
          ErrImagePull: initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>:
          unable to retrieve auth token: invalid username/password: unknown: Authentication
          is required'
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
@Marvin
m
Your message was too long, here's your message at the allowed limit:
Copy code
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx --context ethos44-prod-va7 -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets (sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
No image pull secrets inspect output ```tmahajan@Tushars-MacBook-Pro-9 axia % prefect work-pool inspect default WorkPool( id='5c7f6f71-6f86-4f85-a15a-e1546bc7bf87' created=DateTime(2025 10 2 17 31 19 185796 tzinfo=Timezone('UTC')) updated=DateTime(2025 10 4 17 16 27 43205 tzinfo=Timezone('UTC')) name='default' type='kubernetes' base_job_template={ 'variables' { 'type' 'object' 'properties' { 'env' { 'type' 'object' 'title' 'Environment Variables' 'description' 'Environment variables to set when starting a flow run ' 'additionalProperties' {'anyOf' [{'type' 'string'} {'type' 'null'}]} } 'name' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Name' 'description' 'Name given to infrastructure created by a worker ' } 'image' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Image' 'examples' ['<http //docker io/prefecthq/prefect 3-latest|docker io/prefecthq/prefect 3-latest>'] 'description' 'The image reference of a container image to use for created jobs If not set the latest Prefect image will be used ' } 'labels' { 'type' 'object' 'title' 'Labels' 'description' 'Labels applied to infrastructure created by a worker ' 'additionalProperties' {'type' 'string'} } 'command' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Command' 'description' 'The command to use when starting a flow run In most cases this should be left blank and the command will be automatically generated by the worker ' } 'namespace' { 'type' 'string' 'title' 'Namespace' 'default' 'ns-team-dx-minerva-prod-va7' 'description' 'The Kubernetes namespace to create jobs within ' } 'backoff_limit' { 'type' 'integer' 'title' 'Backoff Limit' 'default' 0 'minimum' 0 'description' 'The number of times Kubernetes will retry a job after pod eviction If set to 0 Prefect will reschedule the flow run when the pod is evicted ' } 'stream_output' { 'type' 'boolean' 'title' 'Stream Output' 'default' True 'description' 'If set output will be streamed from the job to local standard output ' } 'cluster_config' { 'anyOf' [{'$ref' '#/definitions/KubernetesClusterConfig'} {'type' 'null'}] 'description' 'The Kubernetes cluster config to use for job creation ' } 'finished_job_ttl' { 'anyOf' [{'type' 'integer'} {'type' 'null'}] 'title' 'Finished Job TTL' 'default' 60 'description' 'The number of seconds to retain jobs after completion If set finished jobs will be cleaned up by Kubernetes after the given delay If not set jobs will be retained indefinitely ' } 'image_pull_policy' { 'enum' ['IfNotPresent' 'Always' 'Never'] 'type' 'string' 'title' 'Image Pull Policy' 'default' 'IfNotPresent' 'description' 'The Kubernetes image pull policy to use for job containers ' } 'service_account_name' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Service Account Name' 'description' 'The Kubernetes service account to use for job creation ' } 'job_watch_timeout_seconds' { 'anyOf' [{'type' 'integer'} {'type' 'null'}] 'title' 'Job Watch Timeout Seconds' 'description' 'Number of seconds to wait for each event emitted by a job before timing out If not set the worker will wait for each event indefinitely ' } 'pod_watch_timeout_seconds' { 'type' 'integer' 'title' 'Pod Watch Timeout Seconds' 'default' 60 'description' 'Number of seconds to watch for pod creation before timing out ' } } 'definitions' { 'KubernetesClusterConfig' { 'type' 'object' 'title' 'KubernetesClusterConfig' 'required' ['config' 'context_name'] 'properties' { 'config' { 'type' 'object' 'title' 'Config' 'description' 'The entire contents of a
t
@Marvin
Copy code
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx  --context ethos44-prod-va7  -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets 
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
No image pull secrets Image grep:
Copy code
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx  --context ethos44-prod-va7  -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 image     
    image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
    imagePullPolicy: Always
    name: prefect-job
    resources:
      limits:
--
  - image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
    imageID: ""
    lastState: {}
    name: prefect-job
    ready: false
--
        message: 'Back-off pulling image "<http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>":
          ErrImagePull: initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>:
          unable to retrieve auth token: invalid username/password: unknown: Authentication
          is required'
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
m
thought for 60.7 seconds
Thanks — that confirms the issue: the flow-run pod does not have any imagePullSecrets attached, so the registry auth isn’t being applied. Why this happens - Your deployment YAML shows
image_pull_secrets
set, but the pod YAML does not include
imagePullSecrets
. That means the job variables didn’t make it through to the Kubernetes Job/Pod spec used by the worker. - Common causes: - The work pool is not a Kubernetes pool, so K8s-only fields are ignored. - The key/structure doesn’t match what the Kubernetes worker expects. - The deployment wasn’t re-created after editing prefect.yaml. - The field is set at deployment, but the pool’s base job template is overriding/ignoring it. Do this to fix and verify 1) Confirm your work pool is Kubernetes and see its template
Copy code
prefect work-pool inspect default
- Ensure
"type": "kubernetes"
. - If it’s not Kubernetes, create a new Kubernetes pool and target that pool in your deployment. 2) Set namespace explicitly and try image_pull_secrets as strings Some versions expect a list of strings (names), not objects. Also set the namespace.
Copy code
deployments:
- name: "default"
  entrypoint: "flows/example_flow.py:hello_flow"
  work_pool:
    name: "default"
    job_variables:
      namespace: ns-team-dx-minerva-prod-va7
      image: "{{ build-image.image }}"
      image_pull_policy: "Always"
      image_pull_secrets:
        - dxminerva-cloud-artifactory
Then redeploy:
Copy code
prefect deploy
Verify the deployment has job variables:
Copy code
prefect deployment inspect hello-flow/default
You should see those keys under
job_variables
. 3) Verify the pod includes imagePullSecrets After a new run starts:
Copy code
kubectl get pod <pod> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
You should see:
Copy code
imagePullSecrets:
- name: dxminerva-cloud-artifactory
4) If it still doesn’t surface, bind the secret via a ServiceAccount (robust approach) Create a ServiceAccount in the same namespace:
Copy code
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-worker
  namespace: ns-team-dx-minerva-prod-va7
imagePullSecrets:
- name: dxminerva-cloud-artifactory
Then set in job variables:
Copy code
service_account_name: prefect-worker
namespace: ns-team-dx-minerva-prod-va7
Redeploy and confirm the pod’s
spec.serviceAccountName
is
prefect-worker
. 5) Double-check the registry host in the secret Decode and ensure the
.dockerconfigjson
“auths” key exactly matches your image registry host: - Image host:
<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com>
- Secret should contain:
Copy code
"auths": {
  "<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com>": { ... }
}
If not, recreate:
Copy code
kubectl delete secret dxminerva-cloud-artifactory -n ns-team-dx-minerva-prod-va7

kubectl create secret docker-registry dxminerva-cloud-artifactory \
  --docker-server=<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com> \
  --docker-username='<username>' \
  --docker-password='<password-or-api-key>' \
  --docker-email='<email>' \
  -n ns-team-dx-minerva-prod-va7
6) Optional: bake into the work pool base job template If you prefer it as the pool default, edit the work pool’s base job template in the UI to include
imagePullSecrets
and/or
serviceAccountName
, or update via CLI with
--base-job-template
. This ensures all deployments on the pool inherit the pull secret. Once
imagePullSecrets
is present on the pod and the auth matches the registry host, the pull should succeed. Refs: - Kubernetes work pools and job variables - prefect.yaml reference If it still doesn’t attach, please share: -
prefect work-pool inspect default
-
prefect deployment inspect hello-flow/default
(confirm job_variables present) -
kubectl get pod <pod> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 -i imagePullSecrets
t
@Marvin
Copy code
'job_variables': {
        'image': '<http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t17-30-02-640215-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t17-30-02-640215-00-00>',
        'namespace': 'ns-team-dx-minerva-prod-va7',
        'image_pull_policy': 'Always',
        'image_pull_secrets': ['dxminerva-cloud-artifactory']
    },
Still same error
m
thought for 59.4 seconds
Great debugging — the smoking gun is that the pod has no imagePullSecrets, even though your deployment’s job_variables include them. The reason: your Kubernetes work pool’s base job template doesn’t reference image_pull_secrets, so the value you set in job_variables is ignored when rendering the pod spec. Two reliable ways to fix it: Option A (recommended): Use a ServiceAccount that has the pull secret 1) Create ServiceAccount with the secret:
Copy code
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-worker
  namespace: ns-team-dx-minerva-prod-va7
imagePullSecrets:
- name: dxminerva-cloud-artifactory
2) Point your deployment at this SA (and namespace):
Copy code
work_pool:
  name: "default"
  job_variables:
    namespace: ns-team-dx-minerva-prod-va7
    service_account_name: prefect-worker
    image: "{{ build-image.image }}"
    image_pull_policy: "Always"
3) Redeploy (
prefect deploy
) and confirm the pod has
serviceAccountName: prefect-worker
and can pull the image. Option B: Add imagePullSecrets to the work pool’s base job template 1) Export the default job template:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes --file k8s-template.json
2) Edit k8s-template.json, adding imagePullSecrets to the pod spec: - Find:
job_manifest.spec.template.spec
- Add this line (comma separate appropriately):
Copy code
"imagePullSecrets": "{{ image_pull_secrets }}",
Full shape (context):
Copy code
"spec": {
  "completions": 1,
  "containers": [
    {
      "args": "{{ command }}",
      "env": "{{ env }}",
      "image": "{{ image }}",
      "imagePullPolicy": "{{ image_pull_policy }}",
      "name": "prefect-job"
    }
  ],
  "parallelism": 1,
  "restartPolicy": "Never",
  "serviceAccountName": "{{ service_account_name }}",
  "imagePullSecrets": "{{ image_pull_secrets }}"
}
3) Update the work pool to use this template:
Copy code
prefect work-pool update default --base-job-template k8s-template.json
4) Keep your deployment job_variables as you have them:
Copy code
job_variables:
  namespace: ns-team-dx-minerva-prod-va7
  image: "{{ build-image.image }}"
  image_pull_policy: "Always"
  image_pull_secrets:
    - name: dxminerva-cloud-artifactory
5) Redeploy and verify the pod YAML now shows:
Copy code
spec:
  imagePullSecrets:
  - name: dxminerva-cloud-artifactory
Also verify - Work pool type is Kubernetes:
Copy code
prefect work-pool inspect default
- The flow-run pod is in the
ns-team-dx-minerva-prod-va7
namespace. - The secret’s dockerconfigjson “auths” key matches the exact registry host: - Image host: docker-minerva-release.dr-uw2.adobeitc.com - Secret auths must include: "docker-minerva-release.dr-uw2.adobeitc.com" Once the pod has imagePullSecrets (or the SA approach), the pull error should resolve. If it persists, it’s almost always a registry host mismatch in the secret or bad credentials. References: - Kubernetes work pools and job variables - prefect.yaml reference
t
@Marvin I want deployment specific pip install requirements step, i have separate directory for each flow and their requirements.txt file, let me modify below prefect.yaml file
Copy code
name: axia
prefect-version: 3.4.21
pull:
- prefect.deployments.steps.git_clone:
    id: clone-step # needed to be referenced in subsequent steps
    repository: <https://github.com/OneAdobe/minerva.git>
    branch: main
    access_token: "{{ prefect.blocks.secret.deployment-default-hello-flow-repo-token }}"
- prefect.deployments.steps.pip_install_requirements:
    directory: "{{ clone-step.directory }}" # `clone-step` is a user-provided `id` field
    requirements_file: requirements.txt


deployments:
- name: "default"
  schedule: null
  entrypoint: "example_flow.py:hello_flow"
  work_pool:
    name: "default"
m
thought for 57.4 seconds
Yes, you can do deployment-specific installs with
git_clone
+
pip_install_requirements
. The key is to put the pull steps on each deployment and set the working directory to the flow’s subdirectory so you can target that flow’s own
requirements.txt
. Here’s a pattern you can use for multiple flows in the same repo, each with its own requirements:
Copy code
name: axia
prefect-version: 3.4.21

deployments:
- name: hello-flow
  entrypoint: "example_flow.py:hello_flow"      # relative to the working directory below
  pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://github.com/OneAdobe/minerva.git>
      branch: main
      access_token: "{{ prefect.blocks.secret.deployment-default-hello-flow-repo-token }}"
      # Optional: limit checkout size
      # directories:
      #   - flows/hello-flow
  - prefect.deployments.steps.set_working_directory:
      directory: "{{ clone.directory }}/flows/hello-flow"
  - prefect.deployments.steps.pip_install_requirements:
      directory: "{{ clone.directory }}/flows/hello-flow"
      requirements_file: requirements.txt
  work_pool:
    name: default
    job_variables:
      namespace: ns-team-dx-minerva-prod-va7
      image: "<your-runtime-image>"
      image_pull_policy: "Always"
      # If using a ServiceAccount for pulls, also:
      # service_account_name: prefect-worker

- name: other-flow
  entrypoint: "example_flow.py:other_flow"
  pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://github.com/OneAdobe/minerva.git>
      branch: main
      access_token: "{{ prefect.blocks.secret.deployment-default-hello-flow-repo-token }}"
  - prefect.deployments.steps.set_working_directory:
      directory: "{{ clone.directory }}/flows/other-flow"
  - prefect.deployments.steps.pip_install_requirements:
      directory: "{{ clone.directory }}/flows/other-flow"
      requirements_file: requirements.txt
  work_pool:
    name: default
    job_variables:
      namespace: ns-team-dx-minerva-prod-va7
      image: "<your-runtime-image>"
      image_pull_policy: "Always"
Notes and tips: -
git_clone
returns
directory
(the clone path). Use
set_working_directory
to point Prefect to the specific subdir for the flow so your
entrypoint
and
requirements.txt
are relative and simple. - You can keep
requirements_file: requirements.txt
if you set
directory
to the flow’s folder; otherwise you can point
requirements_file
to a subpath from the repo root. - Make sure your runtime image has
pip
(and
git
if you’re cloning at runtime). If you’re using a custom image, set it via
job_variables.image
. - If your repo is private, the
access_token
reference is fine using a Secret block. Alternatively, you can set
credentials
to a block that holds git creds. - This installs at run time (per flow run). If the dependencies are stable, consider baking them into the image for faster, reproducible runs. Docs: - prefect.yaml reference - Kubernetes work pools and job variables
t
@Marvin suppose a kubernetes job fails and we have set a ttl of 60 seconds on work pool, but failed/errimagepull jobs is not getting cleaned (completed ones are cleaned), how to clean them up ?
m
thought for 64.3 seconds
Short answer: Kubernetes TTL only deletes “finished” Jobs (Complete or Failed). ErrImagePull/ImagePullBackOff Jobs are not “finished” — they stay Active/Pending — so TTL will not clean them. You need to either cause the Job to finish (then TTL works) or proactively delete stuck Jobs. Practical options 1) Add an active deadline so stuck Jobs become Failed, then TTL cleans them - Add activeDeadlineSeconds to the Job spec in your work pool’s base job template. This marks the Job Failed after N seconds even if the Pod never starts, making it eligible for TTL. - Steps: - Export the default K8s template:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes --file k8s-template.json
- Edit k8s-template.json: add activeDeadlineSeconds under job_manifest.spec
Copy code
"spec": {
      "backoffLimit": 0,
      "activeDeadlineSeconds": 600,   // e.g., 10 minutes
      "template": {
        "spec": {
          ...
- Update the work pool:
Copy code
prefect work-pool update <your-pool-name> --base-job-template k8s-template.json
- Keep your existing finished_job_ttl (Prefect variable) so once the Job is marked Failed by the deadline, TTL removes it. 2) Clean up stuck Jobs periodically (CronJob or script) - If you just need a one-off cleanup for ErrImagePull/ImagePullBackOff, you can delete Jobs whose Pods are in those states. Example using jq:
Copy code
NS=ns-team-dx-minerva-prod-va7
  kubectl get pods -n "$NS" -o json \
  | jq -r '.items[]
      | select(.status.phase=="Pending")
      | select(.status.containerStatuses[0].state.waiting.reason=="ImagePullBackOff"
            or .status.containerStatuses[0].state.waiting.reason=="ErrImagePull")
      | .metadata.ownerReferences[]
      | select(.kind=="Job")
      | .name' \
  | sort -u \
  | xargs -r -n1 kubectl delete job -n "$NS"
- To make future cleanup easier, add a label via job_variables and target that label:
Copy code
job_variables:
    labels:
      app: prefect
Then you can scope to
-l app=prefect
in your cleanup. 3) Fix the root cause so these don’t accumulate - Ensure image pull secrets are applied (serviceAccountName or image_pull_secrets in a base job template that actually wires them into
pod.spec.imagePullSecrets
). - Consider setting
service_account_name
in job_variables and attaching the pull secret to that ServiceAccount; it’s robust and avoids per-deployment wiring. Key notes - finished_job_ttl only applies after the Job is Complete or Failed; pending ImagePullBackOff Jobs won’t be deleted by TTL. - activeDeadlineSeconds lives at job.spec (not the pod template) and will force the Job to Failed after the deadline. - backoffLimit=0 prevents retries but does not mark a pending ImagePullBackOff as Failed; you need the active deadline or manual cleanup. References - Prefect Kubernetes work pools and job variables - Kubernetes Job termination and TTL
t
@Marvin I have a self hosted prefect server and worker using helm chart, can you compare (pros and cons) of deployment method for kubernetes work pool using git clone and building docker images (for multiple flow deployments)
m
thought for 55.8 seconds
Great question. Here’s a practical comparison of the two patterns for Prefect 3 on Kubernetes with a self-hosted server/workers. Approach A: Runtime git clone + pip install (per deployment) What it looks like - Use deployment pull steps:
git_clone
+
set_working_directory
+
pip_install_requirements
- One shared runtime image for many flows; code fetched at run time Pros - Fast iteration: deploy code changes without rebuilding images - Fewer images to manage for many flows (good for monorepos) - Smaller registry footprint; avoids duplicate deps across many images - Easy to parameterize per-flow directories and per-flow requirements Cons - Cold start latency: clone + pip install during each run (can be minutes for heavy deps) - Requires cluster egress to Git and valid tokens; more secret management at runtime - Reproducibility varies: pip resolve can drift unless pinned and cached - Pod image must include git and build tools (C compilers for native deps) - Harder to “airgap” (no network) clusters Mitigations - Pin versions and use constraints files; consider wheels cache - Pre-bake heavy deps in a base image;
pip_install_requirements
just adds small flow-specific deps - Mount a pip cache (emptyDir) and set
PIP_CACHE_DIR
- Use a ServiceAccount for git (if using Git over K8s-private infra) or store token in a Secret block - Ensure your runtime image has git and build tooling Approach B: Build/push Docker images (per flow or per group) What it looks like - Use
build_docker_image
+
push_docker_image
steps in prefect.yaml - Bake code and deps into immutable images; worker only pulls and runs Pros - Fast, predictable startup; no runtime install - Strong reproducibility/auditability (tagged images, SBOM/scanning) - Works without cluster egress to Git/ PyPI - Avoids runtime git tokens; less secret sprawl in pods - Easier to meet regulated environment requirements Cons - More CI/CD complexity; need build pipeline and registry auth/imagePullSecrets - More images to build/manage for many flows unless you share an image - Registry storage and image churn if you tag per commit - Fixes require an image rebuild/push cycle Mitigations - Use a shared base image for common deps; build slim per-flow layers (copy code only) - Multi-stage builds and caching (buildx) to speed CI - Consolidate flows that share deps into shared images - Enforce unique tags and
image_pull_policy: Always
to avoid stale pods - Attach image pull secrets via ServiceAccount or base job template When to choose which - Choose Runtime git clone + pip when: - You have many flows in a monorepo and want minimal image sprawl - You push frequent code changes and want ultra-fast deploys without builds - Dependencies are light or you can pre-bake heavy ones into a base image - Choose Docker images when: - You need reproducibility, fast cold-starts, offline/egress-restricted clusters - Dependencies are heavy/compiled; you want predictable build-time failures - Security/compliance requires image scanning and immutable artifacts Hybrid pattern (often best for many teams) - Build a “platform” base image with Python + git + system deps + heavy Python libs (pandas, numpy, db drivers) - For flows: - Option 1: Runtime
git_clone
+
pip_install_requirements
for small per-flow deps only - Option 2: Build slim per-flow images that copy just code on top of the base - This gives fast startup, fewer images than fully-per-flow, and still flexible iteration Operational tips for K8s work pools - If using image pulls: bind a ServiceAccount with
imagePullSecrets
(more reliable than per-deployment injection) - Add
activeDeadlineSeconds
in the pool’s base job template so “stuck” Jobs fail and TTL can clean them - Use labels in
job_variables
for easy cleanup/debugging - Ensure
namespace
,
service_account_name
, and
image_pull_policy
are set in
job_variables
Docs
- prefect.yaml reference - Kubernetes work pools guide - Docker deployment guide If you share how many flows, how heavy the deps are, and your CI/CD comfort level, I can recommend a concrete layout (e.g., one base image + runtime pulls vs per-flow images) and provide example prefect.yaml templates for both.
t
@Marvin is it possible to ref our poetry project dependencies in prefect.yaml ?
m
thought for 76.0 seconds
Short answer: there’s no built-in “poetry install” step in prefect.yaml, and
pip_install_requirements
only accepts a requirements.txt. But you can use Poetry in two good ways: Option 1: Export from Poetry and use pip_install_requirements (simple, no Poetry needed at runtime) - Export a requirements file from your pyproject/lock, then let Prefect’s pip step install it. - Works even if your runtime image doesn’t have Poetry preinstalled. Example:
Copy code
deployments:
- name: hello-flow
  entrypoint: "example_flow.py:hello_flow"
  pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://github.com/your-org/your-repo.git>
      branch: main
      access_token: "{{ prefect.blocks.secret.repo-token }}"
  - prefect.deployments.steps.set_working_directory:
      directory: "{{ clone.directory }}/flows/hello-flow"
  - prefect.deployments.steps.run_shell_script:
      directory: "{{ clone.directory }}/flows/hello-flow"
      script: |
        # requires Poetry available in PATH where this runs; if not, see Option 2 or bake into image
        poetry export -f requirements.txt --without-hashes -o .prefect-reqs.txt
  - prefect.deployments.steps.pip_install_requirements:
      directory: "{{ clone.directory }}/flows/hello-flow"
      requirements_file: .prefect-reqs.txt
  work_pool:
    name: default
Tip: If your runtime image doesn’t have Poetry, you can export in CI and commit or attach the exported file (e.g.,
.prefect-reqs.txt
) to your repo, then skip the export step at runtime. Option 2: Run Poetry install at runtime (requires Poetry in the image or install it first) - Install Poetry (if needed) and install deps into the container’s environment. Disable virtualenv creation so packages land in the container’s Python.
Copy code
deployments:
- name: hello-flow
  entrypoint: "example_flow.py:hello_flow"
  pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://github.com/your-org/your-repo.git>
      branch: main
      access_token: "{{ prefect.blocks.secret.repo-token }}"
  - prefect.deployments.steps.set_working_directory:
      directory: "{{ clone.directory }}/flows/hello-flow"
  - prefect.deployments.steps.run_shell_script:
      directory: "{{ clone.directory }}/flows/hello-flow"
      script: |
        # If Poetry is not present, install it; otherwise, skip
        python -m pip install --no-cache-dir poetry==1.8.3
        poetry config virtualenvs.create false
        poetry install --no-interaction --no-ansi --only main
  work_pool:
    name: default
Notes: - The image must have build tools for any native deps (gcc, musl-dev, etc.) if needed. - For reproducibility, always commit and use a lock file (poetry.lock). Option 3: Bake Poetry deps into the image (fastest and most reproducible) - Use a custom Dockerfile in your build step that installs from pyproject/poetry.lock.
Copy code
build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build-image
    image_name: "{{ $PREFECT_IMAGE_NAME }}"
    tag: "{{ $.timestamp }}"
    dockerfile: Dockerfile   # use your custom Dockerfile below

# Dockerfile
# ---
# FROM prefecthq/prefect:3.4.21-python3.12
# WORKDIR /opt/prefect/axia
# RUN python -m pip install --no-cache-dir poetry==1.8.3
# COPY pyproject.toml poetry.lock ./
# RUN poetry config virtualenvs.create false \
#  && poetry install --no-interaction --no-ansi --only main
# COPY . .
# ENV PYTHONPATH=/opt/prefect/axia
- Pro: very fast startup, deterministic, no runtime network required to PyPI. - Con: requires CI/CD to build/push images per change. Which to use? - Many flows, frequent code-only updates, light deps: Option 1 (export + pip) is a nice balance. - Locked-down/air-gapped clusters or heavy deps: Option 3 (bake into image). - If you truly want Poetry at runtime, Option 2 works; just ensure Poetry is installed in the image or install it at run time. References:
t
@Marvin this is my deployment file
Copy code
name: common
prefect-version: 3.4.21

build:

pull:
  - prefect.deployments.steps.git_clone:
      id: clone-step
      repository: <https://github.com/OneAdobe/minerva-prefect.git>
      branch: main
      access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
  - prefect.deployments.steps.pip_install_requirements:
      directory: "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table"
      requirements_file: requirements.txt

deployments:
  - name: "enrich_ims_org_info_table"
    schedule: null
    entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
    work_pool:
      name: "Local Worker"
I have written entry point relative to the code cloned from git, but when i try to deploy, it searches for flow code in local directories and fails the deployment
m
thought for 176.0 seconds
You’re running into a key behavior of Prefect 3.x: pull steps only run on the worker at flow run time. They do not run during prefect deploy. During deployment, Prefect still needs to import your flow to build the parameter schema, so the entrypoint must exist on the machine running prefect deploy. That’s why your deploy fails: the CLI looks for flows/common/enrich_ims_org_info_table/flow.py locally and can’t find it, because the git_clone step hasn’t run yet. You have three good options: Option A — Run deploy from a local checkout of the repo - Locally clone https://github.com/OneAdobe/minerva-prefect.git - Run prefect deploy from the repo root so the entrypoint path exists locally:
Copy code
entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
- Keep your runtime pull steps to clone/install in the pod:
Copy code
deployments:
  - name: "enrich_ims_org_info_table"
    entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
    pull:
    - prefect.deployments.steps.git_clone:
        id: clone
        repository: <https://github.com/OneAdobe/minerva-prefect.git>
        branch: main
        access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
    - prefect.deployments.steps.set_working_directory:
        directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
    - prefect.deployments.steps.pip_install_requirements:
        directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
        requirements_file: requirements.txt
Option B — Clone during “build” for deploy-time, and also clone during “pull” for run-time - Use a build step to clone the repo for the deploy command, then reference its output in entrypoint. Keep pull steps for runtime. - Example:
Copy code
name: common
  prefect-version: 3.4.21

  build:
  - prefect.deployments.steps.git_clone:
      id: clone_for_deploy
      repository: <https://github.com/OneAdobe/minerva-prefect.git>
      branch: main
      access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"

  deployments:
  - name: "enrich_ims_org_info_table"
    entrypoint: "{{ clone_for_deploy.directory }}/flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
    pull:
    - prefect.deployments.steps.git_clone:
        id: clone
        repository: <https://github.com/OneAdobe/minerva-prefect.git>
        branch: main
        access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
    - prefect.deployments.steps.set_working_directory:
        directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
    - prefect.deployments.steps.pip_install_requirements:
        directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
        requirements_file: requirements.txt
    work_pool:
      name: "Local Worker"
- This ensures the flow exists at deploy time (so the CLI can import it) and at run time. Option C — Recommended programmatic deployment: flow.from_source(...) - Skip YAML for this case and deploy directly from Git without having the code locally:
Copy code
from prefect.blocks.system import Secret
  from prefect import flow

  token = Secret.load("dxminerva-git-pat").get()

  deployment = flow.from_source(
      source="<https://github.com/OneAdobe/minerva-prefect.git>",
      entrypoint="flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table",
      access_token=token,
  ).deploy(
      name="enrich_ims_org_info_table",
      work_pool_name="Local Worker",
  )
- This is the cleanest for “remote source” deployments in Prefect 3.x. Why this is necessary - prefect deploy imports your flow to build parameter schema and metadata. Since pull steps don’t run at deploy time, the CLI must find the entrypoint locally (Options A/B) or you must deploy programmatically from the source (Option C). Docs - Prefect YAML reference: prefect.yaml reference
- Work pools (Kubernetes, Process, etc.): Kubernetes work pools guide If you tell me which option you prefer, I can tailor the exact YAML or Python needed for your setup.
t
@Marvin how to set a global variable for a flow, which will be referenced by function defined in a separate file from flow/task, like we will fetch response for multiple jobs which means hitting api mulitple times, and we want access to be generated once and reused
m
thought for 72.1 seconds
Great question. In Prefect 3, there are a few solid patterns to generate an access token once per flow run and reuse it across multiple tasks/functions (even when those functions live in separate modules). Recommended patterns 1) Pass the token/session from the flow to tasks (simplest and explicit) - Compute the token/session at the start of the flow and pass it to tasks as an argument. This avoids hidden globals and works well with threads. Note: if you switch to process/distributed executors, passing a requests.Session may not serialize; pass the token string instead.
Copy code
# auth.py
import requests

def get_token(client_id, client_secret):
    r = <http://requests.post|requests.post>("<https://auth.example.com/oauth/token>", data={
        "client_id": client_id,
        "client_secret": client_secret,
        "grant_type": "client_credentials",
    })
    r.raise_for_status()
    data = r.json()
    return data["access_token"], int(data.get("expires_in", 3600))

# flows.py
from prefect import flow, task
from prefect.blocks.system import Secret
from auth import get_token

@task
def fetch_job(job_id: str, token: str):
    import requests
    r = requests.get(f"<https://api.example.com/jobs/{job_id}>",
                     headers={"Authorization": f"Bearer {token}"})
    r.raise_for_status()
    return r.json()

@flow
def my_flow(job_ids: list[str]):
    creds = Secret.load("api-client-credentials").get()
    token, _ = get_token(creds["client_id"], creds["client_secret"])

    results = [fetch_job.submit(j, token) for j in job_ids]
    return [r.result() for r in results]
2) Use a module-level singleton with TTL (shared within the flow-run process) - Prefect’s default task runner is thread-based; a module-level cache is shared by tasks in the same flow run process. This avoids passing tokens to every task, while still generating once and refreshing on expiry. Add a lock for thread-safety.
Copy code
# auth.py
import time, threading, requests
from typing import Optional, Tuple
from prefect.blocks.system import Secret

_lock = threading.Lock()
_token: Optional[str] = None
_expiry_ts: float = 0.0

def _fetch_token() -> Tuple[str, float]:
    creds = Secret.load("api-client-credentials").get()
    r = <http://requests.post|requests.post>("<https://auth.example.com/oauth/token>", data={
        "client_id": creds["client_id"],
        "client_secret": creds["client_secret"],
        "grant_type": "client_credentials",
    })
    r.raise_for_status()
    data = r.json()
    token = data["access_token"]
    ttl = int(data.get("expires_in", 3600))
    return token, time.monotonic() + max(0, ttl - 30)  # refresh 30s early

def get_token() -> str:
    global _token, _expiry_ts
    now = time.monotonic()
    with _lock:
        if _token is None or now >= _expiry_ts:
            _token, _expiry_ts = _fetch_token()
        return _token
Use from any module:
Copy code
# tasks.py
from prefect import task
from auth import get_token
import requests

@task
def fetch_job(job_id: str):
    token = get_token()
    r = requests.get(f"<https://api.example.com/jobs/{job_id}>",
                     headers={"Authorization": f"Bearer {token}"})
    r.raise_for_status()
    return r.json()
Notes and caveats - Thread vs process runners: The module-level cache works great with the default thread-based runner. If you switch to a process-based runner (or distributed executors), each process will have its own cache and may fetch its own token. Passing tokens explicitly (pattern 1) is most robust across execution backends. - Token security: Avoid storing tokens in Prefect result caches. Use blocks (e.g., Secret) for credentials and keep tokens in memory only. - Expiry: Implement a TTL/refresh window to avoid using an expired token mid-flow. - Multiple concurrent flows: Each flow run is typically a separate process/pod; a module-level cache is per-process, so flows don’t share tokens with each other (usually desired).
- Sessions: If you prefer a shared requests.Session with auth headers, cache the Session similarly to the token. Be mindful of serialization if you pass it between processes. If you share whether you’re sticking with the default thread runner or plan to use process/distributed execution, I can tailor the pattern and code to exactly match your setup. Docs: - Blocks (Secret) for credentials: https://docs-3.prefect.io/v3/blocks/ - Tasks and runners overview: https://docs-3.prefect.io/v3/concepts/tasks
t
@Marvin I have around 3k rows and I use the following function to enrich them, is the existing flow alright or all the rows should be enriched as a single task ?
Copy code
@task(name="enrich_default_merge_policy")
def enrich_default_merge_policy(row: Dict) -> Dict:
    logger = get_run_logger()
    row = row.copy()

    env = row.get("env")
    region = row.get("region")
    ims_org_id = row.get("ims_org_id")
    sandbox_name = row.get("sandbox_name")
    scan_job_id = row.get("scan_job_id")
    model = row.get("model")

    row.update(MERGE_POLICY_FIELDS)

    try:
        ims_api_client = get_ims_api_client()
        logos_api_client = get_logos_api_client(ims_api_client)

        resp = logos_api_client.get_merge_policy(environment=env, region=region, ims_org_id=ims_org_id,
                                                 sandbox_name=sandbox_name,
                                                 schema=model, default=True, client_id="acp_core_unifiedProfile_feeds",
                                                 limit=1)
        if resp.status_code != 200:
            logger.warning(f"Job {scan_job_id}: got non-200 ({resp.status_code}) while fetching default merge policy")
            return row

        default_merge_policy = resp.json()
        default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])

        row["default_merge_policy_id"] = default_merge_policy_id

        return row

    except Exception as e:
        logger.error(f"Error enriching default merge policy id for job: {scan_job_id}: {e}")
        return row
m
thought for 363.6 seconds
Great question. Short answer: don’t put all 3k rows into a single task. Your per-row task is a good fit—just submit them concurrently from a flow and control concurrency to respect API limits. Guidance for Prefect 3.x: - Keep tasks small and idempotent: one row per task gives you per-row observability, retries, and timeouts. A single “mega-task” loses that. - Use a thread pool for I/O-bound work: HTTP calls are I/O-bound, so use ThreadPoolTaskRunner and set a reasonable max_workers (e.g., 10–50 depending on the API). - Add retries/timeouts: let exceptions propagate so retries can kick in (don’t swallow exceptions and return a “success” row). - Rate-limit with task tags: set a concurrency limit on a tag so you never exceed the upstream API’s rate. Tag the task with that tag. Example pattern
Copy code
from typing import Dict, List
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner

# Tag this task so we can use a concurrency limit
@task(
    name="enrich_default_merge_policy",
    retries=3,
    retry_delay_seconds=5,
    timeout_seconds=30,
    tags=["merge-policy"]  # used for rate limiting
)
def enrich_default_merge_policy(row: Dict) -> Dict:
    logger = get_run_logger()
    r = row.copy()

    env = r.get("env")
    region = r.get("region")
    ims_org_id = r.get("ims_org_id")
    sandbox_name = r.get("sandbox_name")
    scan_job_id = r.get("scan_job_id")
    model = r.get("model")

    r.update(MERGE_POLICY_FIELDS)

    # Let errors raise so retries happen
    ims_api_client = get_ims_api_client()
    logos_api_client = get_logos_api_client(ims_api_client)

    resp = logos_api_client.get_merge_policy(
        environment=env,
        region=region,
        ims_org_id=ims_org_id,
        sandbox_name=sandbox_name,
        schema=model,
        default=True,
        client_id="acp_core_unifiedProfile_feeds",
        limit=1,
    )
    if resp.status_code != 200:
        logger.warning(f"Job {scan_job_id}: got non-200 ({resp.status_code}) while fetching default merge policy")
        return r  # consider raising if you want retries on non-200 too

    default_merge_policy = resp.json()
    default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])
    r["default_merge_policy_id"] = default_merge_policy_id
    return r

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict]) -> List[Dict]:
    futures = [enrich_default_merge_policy.submit(row) for row in rows]
    return [f.result() for f in futures]
Enforce a safe concurrency limit on the tag - Create a limit (e.g., only 10 tasks with tag “merge-policy” can run at once):
Copy code
prefect concurrency-limit create merge-policy 10
- Inspect/update as needed:
Copy code
prefect concurrency-limit ls
prefect concurrency-limit inspect merge-policy
prefect concurrency-limit reset merge-policy
When to batch/chunk - 3k tasks is typically fine. If your orchestration overhead or memory footprint becomes a concern, chunk into batches (e.g., 100–300 rows per batch) and submit one task per batch. You’ll lose per-row retries/visibility but reduce orchestration overhead. - Example batch pattern:
Copy code
def chunk(seq, size):
    for i in range(0, len(seq), size):
        yield seq[i:i+size]

@task(tags=["merge-policy"])
def enrich_batch(batch: List[Dict]) -> List[Dict]:
    # If APIs are strict, keep this sequential inside the batch
    return [enrich_default_merge_policy.fn(r) for r in batch]

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def enrich_all_batched(rows: List[Dict], batch_size: int = 200) -> List[Dict]:
    futures = [enrich_batch.submit(b) for b in chunk(rows, batch_size)]
    batches = [f.result() for f in futures]
    return [r for batch in batches for r in batch]
A few practical tips - Let exceptions raise to trigger retries (log and re-raise). If you catch and return, Prefect will mark the task Successful.
- Pick max_workers based on the upstream API and your infra. Start small (e.g., 10–20) and measure. - If you need strict throughput control beyond concurrency (e.g., 10 req/sec), consider adding a lightweight throttle inside the task or implement a token bucket service. Concurrency limits alone cap parallelism, not per-second rate. - If creating API clients is expensive, consider reusing per-task a requests.Session underneath (thread-safe considerations apply), but avoid sharing mutable clients across processes. If you can share: - Typical API latency and rate limits - Where this runs (machine size, worker type) I can suggest concrete values for max_workers, batch size, and concurrency limit.
t
@Marvin I have already added retry logic on api call side, so what is your recommendation acc to that
m
thought for 34.8 seconds
Got it. If your API client already has its own retry/backoff, I’d avoid “double retrying” at the Prefect layer. Recommended setup - Keep one row per task and submit them concurrently from the flow. - Prefer minimal or conditional Prefect retries: - Either set
retries=0
and rely entirely on the client’s retry logic, or - Use a very small number of Prefect retries (e.g., 1) with a
retry_condition_fn
that only retries on non-HTTP/network/infrastructure issues. - Set a task timeout slightly above the worst-case total time of the client’s internal retries, so Prefect doesn’t cancel valid long-running retries. - Use tag-based concurrency limits to respect the upstream API’s rate. Keep a reasonable
max_workers
on the task runner. Example (minimal Prefect retry, rely on client)
Copy code
from typing import Dict, List
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner

@task(
    name="enrich_default_merge_policy",
    retries=0,
    timeout_seconds=60,  # set > max total time your client can spend retrying
    tags=["merge-policy"]
)
def enrich_default_merge_policy(row: Dict) -> Dict:
    logger = get_run_logger()
    r = row.copy()

    env = r.get("env")
    region = r.get("region")
    ims_org_id = r.get("ims_org_id")
    sandbox_name = r.get("sandbox_name")
    scan_job_id = r.get("scan_job_id")
    model = r.get("model")

    r.update(MERGE_POLICY_FIELDS)

    # Let the client handle retry/backoff; let exceptions bubble if it ultimately fails
    ims_api_client = get_ims_api_client()
    logos_api_client = get_logos_api_client(ims_api_client)

    resp = logos_api_client.get_merge_policy(
        environment=env,
        region=region,
        ims_org_id=ims_org_id,
        sandbox_name=sandbox_name,
        schema=model,
        default=True,
        client_id="acp_core_unifiedProfile_feeds",
        limit=1,
    )

    if resp.status_code != 200:
        logger.warning(f"Job {scan_job_id}: non-200 ({resp.status_code}) fetching default merge policy")
        # Option A: return best-effort
        return r
        # Option B (preferred for data quality): raise to mark failure and review/re-run
        # raise RuntimeError(f"Non-200 from API: {resp.status_code}")

    default_merge_policy = resp.json()
    default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])
    r["default_merge_policy_id"] = default_merge_policy_id
    return r

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict]) -> List[Dict]:
    futures = [enrich_default_merge_policy.submit(row) for row in rows]
    return [f.result() for f in futures]
Optional: conditional Prefect retries (guardrails only) - Use this if you want a single “second chance” for infra/transient errors that your client might not cover (e.g., worker hiccups).
Copy code
def should_retry(exc: Exception) -> bool:
    # Retry only on transient/infra signals, not on HTTP status-based failures
    transient = (TimeoutError, ConnectionError)  # extend with your client’s exceptions
    return isinstance(exc, transient)

@task(
    name="enrich_default_merge_policy",
    retries=1,
    retry_delay_seconds=30,
    retry_condition_fn=should_retry,
    timeout_seconds=60,
    tags=["merge-policy"],
)
def enrich_default_merge_policy(row: Dict) -> Dict:
    ...
Rate limiting via tags - Cap concurrent tasks hitting the API so you don’t exceed limits (independent of your client’s retry/backoff).
Copy code
prefect concurrency-limit create merge-policy 10
prefect concurrency-limit ls
prefect concurrency-limit inspect merge-policy
A couple of tweaks to your current code - Avoid broad
try/except
that returns a “successful” row on failure; it hides real errors and blocks retries. If you want to keep the log, catch, log, then
raise
to let the task fail and be visible/retryable.
- Align
timeout_seconds
with your client’s retry envelope. For example, if the client can take up to ~45s on worst-case backoff, set
timeout_seconds
to ~60–90s. If you can share your API’s rate limits and your client’s max retry/backoff profile, I can suggest concrete values for
max_workers
, the tag concurrency limit, and
timeout_seconds
.
t
@Marvin if i create tasks for each row, around 8k tasks are created and system get hanged
m
thought for 89.0 seconds
Thanks for the detail — the “hang” is usually from creating too many futures at once and keeping them all in memory. 8k tasks in a single flow run can overwhelm the Python process and the UI, even if concurrency limits are low. Recommendations - Don’t submit all rows at once. Batch them and wait between batches to keep a bounded number of futures in memory. - Keep one-row-per-task inside each batch for per-row observability, but only a few hundred tasks in-flight at a time. - Use ThreadPoolTaskRunner for I/O, and keep your tag-based concurrency limit to respect the API. Simple batch pattern
Copy code
from typing import Dict, List, Iterable
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

def chunk(seq: List[Dict], size: int) -> Iterable[List[Dict]]:
    for i in range(0, len(seq), size):
        yield seq[i:i+size]

@task(name="enrich_default_merge_policy", tags=["merge-policy"], retries=0, timeout_seconds=90)
def enrich_default_merge_policy(row: Dict) -> Dict:
    # your existing logic (let client handle retries; let final exceptions raise if needed)
    ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict], batch_size: int = 200) -> List[Dict]:
    results: List[Dict] = []
    for batch in chunk(rows, batch_size):
        futures = [enrich_default_merge_policy.submit(r) for r in batch]
        results.extend(f.result() for f in futures)  # drain this batch before moving on
    return results
Notes - Start with batch_size=100–300 and max_workers=10–30. Tune based on memory and API latency. - Tag concurrency limit keeps you within API limits, but batching is what prevents the local process from holding 8k futures. Create/adjust if needed: - Create:
prefect concurrency-limit create merge-policy 10
- Inspect/list:
prefect concurrency-limit inspect merge-policy
/
prefect concurrency-limit ls
- If results are large, consider returning only needed fields and/or
persist_result=False
on the task to reduce storage overhead. - If you still see pressure, move batches into subflows so each batch is its own flow run:
Copy code
@flow
def process_batch(rows: List[Dict]) -> List[Dict]:
    futures = [enrich_default_merge_policy.submit(r) for r in rows]
    return [f.result() for f in futures]

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict], batch_size: int = 200) -> List[Dict]:
    results = []
    for b in chunk(rows, batch_size):
        results.extend(process_batch(b))
    return results
If you can share where this is running (local machine vs worker, CPU/RAM) and typical API latency, I can suggest concrete values for batch_size, max_workers, and the concurrency limit.
t
@Marvin wont per rows tasks take a lot of storage space ?
m
thought for 86.8 seconds
Short answer: per-row tasks don’t have to consume lots of storage. In Prefect 3.x, results are not persisted by default unless you ask for it. The main “cost” of many tasks is metadata/logs, not storing the actual return values. How to keep storage light with per-row tasks - Don’t persist task results: - Default is no persistence; you can make it explicit:
Copy code
@task(persist_result=False)
  def enrich_default_merge_policy(...): ...
- You can also set it at the flow level to apply to all tasks:
Copy code
@flow(persist_result=False)
  def enrich_all(...): ...
- Avoid keeping large results in memory: - If a task’s return value isn’t used downstream, turn off in-memory caching:
Copy code
@task(persist_result=False, cache_result_in_memory=False)
  def enrich_default_merge_policy(...): ...
- Return small payloads: - Only return the fields you need (e.g., a small dict with row id and the new value). - If you need to store full rows, write them to your own storage (S3, DB) inside the task and just return a small reference (path/id). - Keep logs concise: - Logs are stored per task; avoid logging full rows or large payloads. - Use warnings/errors sparingly; consider summary logs at batch boundaries. - Persist only the final aggregated result if needed: - Keep per-row tasks ephemeral; write/return a compact list and persist just once at the end (or skip persisting entirely). Example pattern
Copy code
@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def enrich_default_merge_policy(row: dict) -> dict:
    # compute small result: only id + enriched field
    ...

@flow(persist_result=False)
def enrich_all(rows: list[dict], batch_size: int = 200) -> list[dict]:
    enriched = []
    for batch in chunk(rows, batch_size):
        futures = [enrich_default_merge_policy.submit(r) for r in batch]
        # collect small results then optionally write once at the end
        enriched.extend(f.result() for f in futures)
    # optionally persist final output (to S3/DB) here
    return enriched  # or return None to avoid storing any large result
If you’re still concerned about storage or UI overhead with many tasks: - Use batching (100–300 per batch) to limit in-flight futures. - Consider subflows per batch to split orchestration records across runs. - Tune logging down to essentials. If you share how large each row/result is and whether you need to persist final outputs, I can suggest exact task/flow settings and an output-writing pattern that keeps storage usage minimal.
t
@Marvin all the rows are being stored in the parameters of flows/tasks
m
thought for 68.3 seconds
You’re right: if you pass the entire list of rows as parameters to flows/tasks, Prefect will serialize and store those parameters for the run. That can bloat storage and slow things down. Ways to avoid storing big payloads as parameters - Pass references, not data - Upload the data to external storage (S3/DB/file) and pass just a pointer (URI/ID/query). - Each task/flow loads only what it needs using that pointer. - Pass indices only (keep data in memory locally) - Load the dataset once inside the flow and keep it in process memory. - Submit tasks with just an integer index; tasks read the row from the in-memory dataset. - Important: use ThreadPoolTaskRunner (same process) for this pattern, not process pool/distributed workers. Example: indices-only pattern (no large params stored)
Copy code
from typing import List, Dict
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

_DATASET: List[Dict] = []  # module-level cache; same process only

@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def enrich_row(i: int) -> Dict:
    row = _DATASET[i]  # small param: just the index
    # ... your existing enrichment logic ...
    # return a small result or write to external storage and return a reference
    return {"id": row["id"], "default_merge_policy_id": "..."}  # keep it small

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def enrich_all(dataset_uri: str, batch_size: int = 200) -> List[Dict]:
    global _DATASET
    _DATASET = load_rows(dataset_uri)  # load once; do not pass rows as a param
    results: List[Dict] = []
    for start in range(0, len(_DATASET), batch_size):
        end = min(start + batch_size, len(_DATASET))
        futures = [enrich_row.submit(i) for i in range(start, end)]
        results.extend(f.result() for f in futures)
    return results  # or write to storage and return nothing
Example: pointer-per-batch (works with any runner/worker)
Copy code
@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def process_batch(dataset_uri: str, start: int, end: int) -> int:
    # Load just the slice from external storage
    rows = stream_rows(dataset_uri, start, end)  # implement streaming/partial load
    # Option A: process sequentially inside the task to avoid per-row params
    # Option B: use plain threads here (not Prefect tasks) for small local parallelism
    count = 0
    for row in rows:
        # ... enrichment logic ...
        # write output to S3/DB; avoid returning large data
        count += 1
    return count  # small summary

@flow(persist_result=False)
def enrich_all(dataset_uri: str, batch_size: int = 500) -> int:
    # Only small params are passed: uri + indices
    totals = []
    size = get_dataset_size(dataset_uri)
    for start in range(0, size, batch_size):
        end = min(start + batch_size, size)
        totals.append(process_batch.submit(dataset_uri, start, end))
    return sum(t.result() for t in totals)
Other tips to keep storage light - Results - Use
persist_result=False
and
cache_result_in_memory=False
on tasks;
@flow(persist_result=False)
at the flow level. - Return only small payloads (IDs, counts). Write large outputs to your own storage and return a reference. - Logging - Don’t log full rows. Keep logs concise. - Concurrency - Keep batching to avoid thousands of in-flight futures. - Use tag-based concurrency limits to respect API limits, but remember they don’t reduce parameter size—batching and reference-passing do. If you can share where the data lives today (file/DB/S3) and how you trigger the flow, I can tailor a pattern (pointer-per-row vs indices-only vs batch references) that fits your runtime and avoids parameter bloat.
t
@Marvin can we use threadtaskrunner in flows on non task decorated functions ?
m
thought for 51.0 seconds
Short answer: No. The ThreadPoolTaskRunner only schedules Prefect tasks. Plain Python functions called from a flow run synchronously in the flow’s main thread unless you decorate them with @task and use .submit. Your options: - Use Prefect-level concurrency - Decorate the function with @task and run it with .submit under a ThreadPoolTaskRunner. - This gives you Prefect observability, retries, and tag-based concurrency limits. - Keep the function “plain” and use local threads inside a single Prefect task - Good if you want fewer Prefect task runs/metadata but still some parallelism. - Prefect won’t track each row, and you can’t use Prefect concurrency limits per row. Example: threads inside one task (no per-row task runs)
Copy code
from concurrent.futures import ThreadPoolExecutor
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

def enrich_fn(row: dict) -> dict:
    # your non-task function
    ...

@task(persist_result=False, cache_result_in_memory=False)
def enrich_batch(rows: list[dict], workers: int = 20) -> list[dict]:
    with ThreadPoolExecutor(max_workers=workers) as ex:
        return list(ex.map(enrich_fn, rows))

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def enrich_all(rows_uri: str) -> int:
    rows = load_rows(rows_uri)  # load by reference to avoid big params
    results = enrich_batch(rows)  # single Prefect task
    write_results(results)        # or return a small summary
    return len(results)
Example: Prefect-level concurrency (per-row tasks)
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def enrich_row(row: dict) -> dict:
    return enrich_fn(row)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def enrich_all(rows: list[dict], batch_size: int = 200) -> list[dict]:
    out = []
    for i in range(0, len(rows), batch_size):
        batch = rows[i:i+batch_size]
        futures = [enrich_row.submit(r) for r in batch]
        out.extend(f.result() for f in futures)
    return out
Rule of thumb: - Need Prefect retries/visibility/rate limits per item? Use @task + .submit. - Want minimal Prefect metadata/storage and are okay with less visibility? Do local threads inside a single @task.
t
@Marvin can you analyze code file attachments ?
m
thought for 21.8 seconds
I can analyze code, but I can’t open Slack file attachments directly. Please do one of the following: - Paste the relevant code sections in this thread (preferred for quick reviews) - Share a link to a GitHub repo/branch, GitHub Gist, or Pastebin that I can access If the file is large, a minimal snippet is ideal. For Prefect issues, include: - Flow and task definitions - Task runner setup and any concurrency limits/tags - Deployment/worker configuration (if applicable) - Any error messages/logs you’re seeing Share whichever you can and I’ll take a look.
t
Copy code
@flow(name="get-export-job-stats", log_prints=True)
def get_export_job_stats(
        envs: List[str],
        regions: List[str],
        models: List[str],
        sources: List[str],
        target_date: Optional[datetime] = None
) -> None:
    logger = get_run_logger()

    date, start_ms, end_ms = query_date_window_ms(target_date)
    logger.info(f"Time window: start_ms={start_ms}, end_ms={end_ms}")

    for env in envs:
        all_export_jobs_env: List[Dict] = []

        # Step 1: Fetch all export jobs in parallel
        fetch_futures: List = []
        for region in regions:
            for model in models:
                for source in sources:
                    fetch_futures.append(
                        query_export_jobs_from_psql.submit(
                            env=env,
                            region=region,
                            model=model,
                            source=source,
                            start_time_ms=start_ms,
                            end_time_ms=end_ms,
                            date=date
                        )
                    )

        wait(fetch_futures)
        # Step 2: Collect results
        for f in fetch_futures:
            try:
                jobs = f.result()
                all_export_jobs_env.extend(jobs)
            except Exception as e:
                logger.warning(f"Failed to fetch jobs for env={env}: {e}")

        logger.info(f"Fetched total {len(all_export_jobs_env)} jobs for env={env}")
        _set_global_rows(all_export_jobs_env)

        # Step 2: Filter commercial jobs (mutates global dataset)
        filter_commercial_export_jobs(env)

        # Step 3-6: Run enrichment stages under subflows that use their own ThreadPoolTaskRunner
        enrich_default_merge_policy_flow()

_GLOBAL_ROWS: List[Dict] = []


def _set_global_rows(rows: List[Dict]):
    global _GLOBAL_ROWS
    _GLOBAL_ROWS = rows


def enrich_default_merge_policy(row: Dict) -> Dict:
    logger = get_logger(__name__)
    row = row.copy()

    env = row.get("env")
    region = row.get("region")
    ims_org_id = row.get("ims_org_id")
    sandbox_name = row.get("sandbox_name")
    scan_job_id = row.get("scan_job_id")
    model = row.get("model")

    row.update(MERGE_POLICY_FIELDS)

    try:
        ims_api_client = get_ims_api_client()
        logos_api_client = get_logos_api_client(ims_api_client)

        resp = logos_api_client.get_merge_policy(environment=env, region=region, ims_org_id=ims_org_id,
                                                 sandbox_name=sandbox_name,
                                                 schema=model, default="true",
                                                 client_id="acp_core_unifiedProfile_feeds",
                                                 limit=1)
        if resp.status_code != 200:
            logger.warning(f"Job {scan_job_id}: got non-200 ({resp.status_code}) while fetching default merge policy")
            return row

        default_merge_policy = resp.json()
        default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])

        row["default_merge_policy_id"] = default_merge_policy_id

        return row

    except Exception as e:
        logger.error(f"Error enriching default merge policy id for job: {scan_job_id}: {e}")
        return row

@flow(name="enrich_default_merge_policy_flow")
def enrich_default_merge_policy_flow(max_workers: int = 30) -> None:
    global _GLOBAL_ROWS
    total = len(_GLOBAL_ROWS)
    if total == 0:
        return
    _set_global_rows(populate_data_concurrently(enrich_default_merge_policy, _GLOBAL_ROWS, max_workers=max_workers))
@Marvin This is my flow code, do you see any improvements from prefect framework pov Updated the code
m
thought for 149.8 seconds
Thanks for sharing the code. Here are the biggest improvements I’d suggest from a Prefect 3.x point of view, focusing on stability, memory usage, observability, and rate control. 1) Add an explicit task runner and bound concurrency - Set a ThreadPoolTaskRunner on the parent flow so your DB/API calls run concurrently but safely. - Avoid creating too many futures at once by batching submissions and/or streaming results. Example for Step 1 (fetch) with bounded in-flight futures:
Copy code
from itertools import product
from prefect import flow, task, get_run_logger
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner

@task(tags=["psql"], persist_result=False, cache_result_in_memory=False, timeout_seconds=60)
def query_export_jobs_from_psql(...)-> list[dict]:
    ...

def chunked(iterable, size):
    it = iter(iterable)
    while True:
        batch = list([next(it) for _ in range(size) if True])
        if not batch:
            break
        yield batch

@flow(name="get-export-job-stats", log_prints=True, task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def get_export_job_stats(envs, regions, models, sources, target_date=None) -> None:
    logger = get_run_logger()
    date, start_ms, end_ms = query_date_window_ms(target_date)
    <http://logger.info|logger.info>(f"Time window: start_ms={start_ms}, end_ms={end_ms}")

    combo_iter = ((env, r, m, s) for env in envs for r, m, s in product(regions, models, sources))
    for env, combos_batch in group_by_env(combo_iter):  # or loop envs outermost as you do
        all_export_jobs_env = []

        # Submit in batches to keep futures bounded (e.g., 200 at a time)
        for batch in chunked(combos_batch, 200):
            futures = [
                query_export_jobs_from_psql.submit(
                    env=env, region=r, model=m, source=s,
                    start_time_ms=start_ms, end_time_ms=end_ms, date=date
                )
                for (_, r, m, s) in batch
            ]
            for fut in as_completed(futures):
                try:
                    all_export_jobs_env.extend(fut.result())
                except Exception as e:
                    logger.warning(f"Failed to fetch jobs for env={env}: {e}")

        <http://logger.info|logger.info>(f"Fetched total {len(all_export_jobs_env)} jobs for env={env}")
        ...
Notes: - as_completed lets you drain results as they finish and reduces the chance of “hangs” from too many pending futures. - Tune max_workers and batch size based on DB latency and machine size. 2) Rate limit with tags (DB and API) - Tag your tasks and set server-side concurrency limits so you don’t overload Postgres or the external API:
Copy code
prefect concurrency-limit create psql 10
prefect concurrency-limit create merge-policy 10
prefect concurrency-limit ls
prefect concurrency-limit inspect merge-policy
Then add tags to tasks, e.g.,
@task(tags=["merge-policy"])
on your enrichment task. 3) Avoid globals or at least constrain their use - Using a module-level
_GLOBAL_ROWS
only works reliably if everything runs in a single Python process (ThreadPoolTaskRunner). It will break with process pools, distributed workers, or if you ever submit subflows elsewhere. - Safer alternatives: - Keep enrichment in the same flow and pass indices to a task that reads from a module-level cache loaded once (same-process only). - Or write the fetched rows to external storage (S3/DB/temp file) and pass a pointer (URI + index range) into subflows/tasks. - If you keep the global pattern, strongly prefer ThreadPoolTaskRunner everywhere these rows are accessed. 4) Choose one of these enrichment patterns - Minimal Prefect overhead (recommended if you had “hangs” with many tasks): - Use a single batch task that runs local threads for per-row enrichment using your plain Python
enrich_default_merge_policy
function. This keeps Prefect task count small while still parallelizing I/O. ```
from concurrent.futures import ThreadPoolExecutor def enrich_default_merge_policy(row: dict) -> dict: # your existing logic; let client retries handle backoff ... @task(tags=["merge-policy"], persist_result=False, cache_result_in_memory=False, timeout_seconds=120) def enrich_batch(rows: list[dict], workers: int = 20) -> list[dict]: with ThreadPoolExecutor(max_workers=workers) as ex: return list(ex.map(enrich_default_merge_policy, rows)) # In your flow: enriched = [] for batch in chunked(all_export_jobs_env, 200): enriched.extend(enrich_batch.submit(batch).result()) ``` - Per-row Prefect tasks (more visibility, more metadata): - If you want per-row states/retries, decorate
enrich_default_merge_policy
as a Prefect task and submit in batches to keep futures bounded. Also set
persist_result=False
and
cache_result_in_memory=False
. 5) Keep storage light - Since you’re careful about not passing large rows as parameters, also explicitly set: -
@flow(persist_result=False)
on flows -
@task(persist_result=False, cache_result_in_memory=False)
on tasks - Return only small payloads or write large results to your own storage and return a small reference. 6) Timeouts and logging - Add
timeout_seconds
to tasks (slightly above the worst-case client retry time) so stuck calls don’t block threads forever. - Use Prefect’s
get_run_logger()
inside tasks/subflows for consistent logs in the UI. If you keep plain functions, pass the message up or log at the task wrapper level. 7) Batching everywhere - You already batch by env. Also batch fetch submissions and enrichment submissions to avoid thousands of in-flight futures and UI overwhelm. If you can share: - Typical DB/API latencies and rate limits - Where this runs (local vs worker; CPU/RAM) I can suggest concrete values for max_workers, batch sizes, and concurrency limits, and help refactor the enrichment step into the batch pattern that keeps Prefect metadata lean while preserving throughput.
t
@Marvin can you share some github projects, code refs which heavily use prefect framework so that I can learn what works and what does not
m
thought for 217.9 seconds
Absolutely—here are solid codebases and references you can study. I’ve prioritized resources that demonstrate real patterns with Prefect 3.x; where versions are mixed, I’ve noted it so you can filter accordingly. Official patterns and curated examples - Prefect 3 docs (flows, deployments, runners, concurrency, patterns): Prefect 3 Docs - Deployments: Deployments - Task runners and concurrency: Task Runners and Concurrency limits - Patterns (batching, futures, subflows): Patterns Prefect integration repos with working examples - AWS: prefect-aws (look for an examples directory and README snippets) - GCP: prefect-gcp - Azure: prefect-azure - Kubernetes: prefect-kubernetes - Docker: prefect-docker - Snowflake: prefect-snowflake - dbt: prefect-dbt - Great Expectations: prefect-great-expectations - Slack notifications: prefect-slack Community patterns and templates - Prefect Recipes (mixed 2.x/3.x; great for ideas—verify version in each recipe): prefect-recipes - Community project template (not official, but useful project scaffolding): prefect-pack Targeted GitHub searches to find active 3.x usage - Search for code using the 3.x API: Search: from prefect import flow - Search for ThreadPoolTaskRunner usage (common in 3.x patterns): Search: ThreadPoolTaskRunner - Search for deployments (3.x uses
prefect deploy
and
flow.from_source(...).deploy(...)
): Search: prefect deploy + from_source What to watch out for (version gotchas) - Prefect 2.x-only patterns you should avoid in 3.x: -
Deployment.build_from_flow()
(removed in 3.x) -
prefect deployment build
CLI (replaced by
prefect deploy
) - GitHub storage blocks (use
flow.from_source('<https://github.com/...>')
instead) - When studying a repo, check its README and
pyproject.toml/requirements.txt
for
prefect>=3
to confirm applicability. If you tell me your stack (cloud provider, DB/warehouse, container/orchestrator) and preferred patterns (per-row tasks vs batches, workers vs local), I can point you to the most relevant example repos and specific files/directories to study.
t
@Marvin why does my prefect workflow fail everyday uilding wheels for collected packages: splunk-sdk Building wheel for splunk-sdk (pyproject.toml): started Building wheel for splunk-sdk (pyproject.toml): finished with status 'done' Created wheel for splunk-sdk: filename=splunk_sdk-2.1.1-py3-none-any.whl size=125895 sha256=18d4d73046dc6b0b6fca5515dbd39e5b979d374cbbf575d5043abb001a852e25 Stored in directory: /home/prefect/.cache/pip/wheels/5a/f9/da/0182ab0ad203cf0b337607cf3b7fa0533f7d70f6b2befda415Successfully built splunk-sdkInstalling collected packages: pyyaml, psycopg2-binary, deprecation, splunk-sdk, hvac, prefect WARNING: The script prefect is installed in '/home/prefect/.local/bin' which is not on PATH. Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.Successfully installed deprecation-2.1.0 hvac-2.3.0 prefect-3.4.23 psycopg2-binary-2.9.11 pyyaml-6.0.3 splunk-sdk-2.1.1[notice] A new release of pip is available: 24.0 -> 25.2[notice] To update, run: pip install --upgrade pip053001.496 | INFO | Flow run 'unbiased-fox' - Beginning flow run 'unbiased-fox' for flow 'enrich-ims-org-info-table'053001.498 | INFO | Flow run 'unbiased-fox' - View at http://prefect-server:4200/runs/flow-run/0199d9c3-0af9-76d6-aeb1-4f33dfaaa43005:30:01.543 | INFO | Task run 'fetch_org_info_from_splunk-930' - Task run failed with exception: ModuleNotFoundError("No module named 'splunklib'") - Retry 1/2 will start 10 second(s) from now053011.558 | INFO | Task run 'fetch_org_info_from_splunk-930' - Task run failed with exception: ModuleNotFoundError("No module named 'splunklib'") - Retry 2/2 will start 10 second(s) from now053021.574 | ERROR | Task run 'fetch_org_info_from_splunk-930' - Task run failed with exception: ModuleNotFoundError("No module named 'splunklib'") - Retries are exhaustedTraceback (most recent call last): when I retry it, the flow passes, but why is this happening in the first place
m
thought for 319.5 seconds
Short answer: your flow’s first run is starting in a Python environment that does not actually see the package you just pip-installed. The “splunk-sdk” install is happening, but the runtime process can’t import
splunklib
because pip installed it into a different location than the interpreter that executes your task. When you retry, the environment/path resolution ends up finding the package (or the first failure completes installation into a location your interpreter later sees). Key clues from your logs: - WARNING: The script prefect is installed in '/home/prefect/.local/bin' which is not on PATH - You are installing packages at runtime (pull step), including prefect itself - ModuleNotFoundError("No module named 'splunklib'") Common root causes - Mismatch between the pip used to install and the Python interpreter used to run the flow (e.g., pip targets one Python, the flow runs with another). - Pip fell back to user installs (
~/.local
) and your runtime interpreter is not reading user site-packages (common if running in a virtualenv or different user). - Installing Prefect at runtime can cause version drift and environment inconsistencies between worker and flow subprocess. What to do Pick one of these stable patterns: 1) Process work pool: pre-provision a virtual environment and stop installing at runtime - Create and activate a venv for your worker host. - pip install your requirements (including splunk-sdk) into that venv. - Start your Prefect worker from that venv. - Remove any runtime pip-install pull steps. This guarantees the flow subprocess inherits the same interpreter with all deps available. 2) If you must install at runtime, ensure pip uses the exact interpreter running your flow - Replace any “pip install …” with “python -m pip install …” so it installs into the same interpreter. - Prefer a venv you control over user-site installs. For example, in your pull steps:
Copy code
pull:
  - run_shell_script:
      id: setup_venv
      script: |
        python -m venv .venv
        . .venv/bin/activate
        python -m pip install --upgrade pip
        python -m pip install -r requirements.txt
  - set_working_directory:
      directory: .
Also make sure the process worker uses this venv to execute the flow (e.g., start the worker from within the venv). 3) Docker/Kubernetes work pools: bake deps into an image - Build a Docker image with splunk-sdk and your other packages. - Reference that image in your deployment so every run has a consistent environment. - This is the most reliable approach for scheduled production runs. Important tips - Don’t install “prefect” during pull steps. Pin Prefect in the worker/ image instead so worker and flow match. Installing Prefect at runtime can break the worker process or lead to subtle import/path issues. - Pin your package versions in requirements to avoid daily drift. - If you see the “.local/bin is not on PATH” warning, it means pip used user-site. Prefer a venv to avoid this, or set up PATH and PYTHONPATH appropriately (but venv is cleaner). Quick diagnostic task you can add to confirm what Python and site-packages your flow is using:
Copy code
from prefect import task, flow
import sys, site, subprocess

@task
def debug_env():
    print("sys.executable:", sys.executable)
    print("sys.version:", sys.version)
    print("site.getsitepackages:", getattr(site, "getsitepackages", lambda: [])())
    print("site.getusersitepackages:", site.getusersitepackages())
    subprocess.run([sys.executable, "-m", "pip", "show", "splunk-sdk"])

@flow
def debug_flow():
    debug_env()
If you can share: - Your work pool type (process, docker, kubernetes) - The pull section of your prefect.yaml (or how you’re installing deps at runtime) - The Python version used by the worker …I can suggest an exact config to make this rock solid. References - Deployments and pull steps overview: Deployments
- prefect.yaml structure and steps (git clone, pip install, shell scripts): prefect.yaml - Running flows in local processes (process work pools): Process work pools - Docker/Kubernetes deployment patterns: Docker, Kubernetes
t
@Marvin my deployment file and work pool is of type process
Copy code
name: common
prefect-version: 3.4.21

build:

deployments:
  - name: "enrich_ims_org_info_table"
    entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
    pull:
      - prefect.deployments.steps.git_clone:
          id: clone-step
          repository: <https://github.com/OneAdobe/minerva-prefect.git>
          branch: main
          access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
      - prefect.deployments.steps.pip_install_requirements:
          directory: "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table"
          requirements_file: requirements.txt
    work_pool:
      name: "Local Pod Process"
    schedules:
      - cron: "0 11 * * *"
        slug: "enrich_ims_org_info_table-daily-at-11am-ist"
        timezone: "Asia/Kolkata"
        active: true


  - name: "get_export_job_stats"
    entrypoint: "flows/ups_profile_export/get_export_job_stats/flow.py:get_export_job_stats"
    pull:
      - prefect.deployments.steps.git_clone:
          id: clone-step
          repository: <https://github.com/OneAdobe/minerva-prefect.git>
          branch: main
          access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
      - prefect.deployments.steps.pip_install_requirements:
          directory: "{{ clone-step.directory }}/flows/ups_profile_export/get_export_job_stats"
          requirements_file: requirements.txt
    work_pool:
      name: "Local Pod Process"
    schedules:
      - cron: "30 11 * * *"
        slug: "get_export_job_stats-daily-at-1130am-ist"
        timezone: "Asia/Kolkata"
        active: true
        parameters:
          envs:
            - prod
          regions:
            - va7
            - nld2
            - aus5
            - can2
            - gbr9
            - ind2
          models:
            - "_xdm.context.profile"
          sources:
            - "scheduler"

  - name: "sandbox_capacity_anomaly_detection"
    entrypoint: "flows/axia/sandbox_capacity_anomaly_detection/flow.py:sandbox_capacity_anomaly_report"
    pull:
      - prefect.deployments.steps.git_clone:
          id: clone-step
          repository: <https://github.com/OneAdobe/minerva-prefect.git>
          branch: main
          access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
      - prefect.deployments.steps.pip_install_requirements:
          directory: "{{ clone-step.directory }}/flows/axia/sandbox_capacity_anomaly_detection"
          requirements_file: requirements.txt
    work_pool:
      name: "Local Pod Process"
    schedules:
      - cron: "00 12 * * *"
        slug: "sandbox_capacity_anomaly_detection-daily-at-1200pm-ist"
        timezone: "Asia/Kolkata"
        active: true
        parameters:
          envs:
            - prod
          regions:
            - va7
            - nld2
            - aus5
            - can2
            - gbr9
            - ind2
          capacity_names:
            - STREAMING_THROUGHPUT
          slack_channel: C09LZBQ1K1V
m
thought for 127.1 seconds
Thanks for sharing the deployment. Given you’re using a Process work pool and
pip_install_requirements
, the daily failure with ModuleNotFoundError("No module named 'splunklib'") is almost certainly an environment mismatch during runtime installs. What’s happening - Your pull step installs packages right before the flow runs. From your logs, pip is installing to the user site (
/home/prefect/.local/...
) and warnings show scripts are not on PATH. - In a Process pool, the flow runs with the Python interpreter/environment that the worker is using. If pip installs to a different location (user site vs the interpreter’s site‑packages) or upgrades/downgrades packages (including Prefect) at runtime, the flow subprocess may not see the newly installed module on the first attempt. - Retrying a task 10 seconds later may succeed because the interpreter state/path resolution ends up seeing the package on the next import attempt, but this is brittle and symptomatic of installing into the wrong place. Why this is fragile - Installing Prefect and other deps at runtime into the same environment the worker is using can cause path and version drift. The
.local
user install is a strong hint that pip isn’t writing to the interpreter’s site‑packages the flow uses. - Process pools do not isolate environments per run; they inherit the worker’s environment. There’s no container or venv boundary unless you create one. How to fix it (pick one) Option A — Recommended for Process pools: pre-provision the worker’s venv and stop installing at runtime - Create/activate a venv on the worker host. - Install all runtime deps there (including splunk-sdk), and DO NOT install Prefect during pull steps. - Start the Prefect worker from that venv. - Remove the
pip_install_requirements
step from your deployments. Keep
git_clone
if you need code. Option B — If you must install at runtime, ensure you install into the exact interpreter the flow will use - Replace
pip_install_requirements
with a
run_shell_script
that uses `python -m pip`:
Copy code
pull:
  - prefect.deployments.steps.git_clone:
      id: clone-step
      repository: <https://github.com/OneAdobe/minerva-prefect.git>
      branch: main
      access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
  - prefect.deployments.steps.run_shell_script:
      script: |
        cd "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table"
        python -m pip install --upgrade pip
        python -m pip install -r requirements.txt
- Remove
prefect
from your requirements.txt used at runtime. Pin Prefect in the worker/image instead to avoid upgrading the SDK mid-run. - Prefer a venv you control rather than relying on user-site installs. Quick checks to confirm the diagnosis - Make sure
flows/common/enrich_ims_org_info_table/requirements.txt
actually includes
splunk-sdk
(the distribution that provides the
splunklib
import). - Add a small debug task to print the interpreter and whether
splunk-sdk
is installed at runtime:
Copy code
from prefect import task, flow
import sys, subprocess

@task
def debug_env():
    print("sys.executable:", sys.executable)
    subprocess.run([sys.executable, "-m", "pip", "show", "splunk-sdk"])

@flow
def debug_flow():
    debug_env()
If you share: - The contents of that requirements.txt - How you start the worker (command and whether it’s in a venv) - Output of sys.executable from a run …I can give you an exact, minimal change to make this rock solid. Docs for reference - Deployments and pull steps: Deployments - prefect.yaml and steps (git clone, pip install, shell): prefect.yaml - Process work pools: Run flows in local processes - Docker/Kubernetes pattern (if you decide to containerize): Docker
t
@Marvin I have deployed app using kubernetes, can you help me with the deployment yaml which build image and pulls it during runtime and executes every flow in its own venv
m
thought for 874.0 seconds
Great — moving to a Kubernetes work pool is the right call for stable, repeatable environments. Here’s a working pattern in Prefect 3.x that: - Builds a Docker image (during deploy) - Pushes it to your registry - Pulls the image at runtime on K8s - Creates a per-run virtualenv in the pod - Installs your flow’s requirements into that venv via pull steps - Executes the flow inside that venv High-level approach - Build/push: Use prefect-docker steps in prefect.yaml to build a base image (Prefect + OS deps, no app code). - Pull: At run time, clone your repo and install requirements with pip — but crucially, we’ll first create and activate a per-run venv and then run Prefect’s “execute” command inside that venv so both the pull steps and the flow use it. - Command override: In your Kubernetes work pool, configure the container command to create/activate the venv, then exec prefect flow-run execute. 1) Minimal Dockerfile (base image only) Put this in your project root as Dockerfile:
Copy code
FROM prefecthq/prefect:3.4.23-python3.11

# Optional: git for cloning during pull steps; add any system packages you need
RUN apt-get update && apt-get install -y --no-install-recommends git && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /opt/prefect
2) prefect.yaml that builds/pushes the image and configures K8s runs with a per-run venv Adjust the registry, work pool name, repo URL, and requirements paths for each flow. ``` name: minerva-k8s prefect-version: 3.4.23 build: - prefect_docker.deployments.steps.build_docker_image: id: build-image image_name: your-registry.example.com/your-team/prefect-flows tag: "latest" dockerfile: "Dockerfile" push: - prefect_docker.deployments.steps.push_docker_image: image_name: your-registry.example.com/your-team/prefect-flows tag: "latest" deployments: - name: "enrich_ims_org_info_table" entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table" pull: - prefect.deployments.steps.git_clone: id: clone-step repository: https://github.com/OneAdobe/minerva-prefect.git branch: main access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}" # This will run after we switch the container into the per-run venv (see command below) - prefect.deployments.steps.pip_install_requirements: directory: "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table" requirements_file: requirements.txt work_pool: name: "Your K8s Work Pool" job_variables: # Ensure the pod pulls the image built above image: "your-registry.example.com/your-team/prefect-flows:latest" image_pull_policy: "Always" # Run each flow in its own venv by creating it before Prefect executes the run # NOTE: This requires your work pool's base job template to allow overriding 'command' and 'args' command: ["/bin/bash", "-lc"] args: - | set -euo pipefail VENV_DIR=/opt/prefect/venvs/${PREFECT_FLOW_RUN_ID} python -m venv "$VENV_DIR" source "$VENV_DIR/bin/activate" python -m pip install --upgrade pip # Execute the Prefect flow-run inside this venv; pull steps will run using this interpreter exec prefect flow-run execute # Optional: add env vars (examples) env: - name: PIP_NO_CACHE_DIR value: "1" # If your registry requires auth, add imagePullSecrets in the work pool base job template or here schedules: - cron: "0 11 * * *" slug: "enrich_ims_org_info_table-daily-at-11am-ist" timezone: "Asia/Kolkata" active: true # Repeat for other deployments; change the requirements path per flow - name: "get_export_job_stats" entrypoint: "flows/ups_profile_export/get_export_job_stats/flow.py:get_export_job_stats" pull:
- prefect.deployments.steps.git_clone: id: clone-step repository: https://github.com/OneAdobe/minerva-prefect.git branch: main access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}" - prefect.deployments.steps.pip_install_requirements: directory: "{{ clone-step.directory }}/flows/ups_profile_export/get_export_job_stats" requirements_file: requirements.txt work_pool: name: "Your K8s Work Pool" job_variables: image: "your-registry.example.com/your-team/prefect-flows:latest" image_pull_policy: "Always" command: ["/bin/bash", "-lc"] args: - | set -euo pipefail VENV_DIR=/opt/prefect/venvs/${PREFECT_FLOW_RUN_ID} python -m venv "$VENV_DIR" source "$VENV_DIR/bin/activate" python -m pip install --upgrade pip exec prefect flow-run execute schedules: - cron: "30 11 * * *" slug: "get_export_job_stats-daily-at-1130am-ist" timezone: "Asia/Kolkata" active: true parameters: envs: [prod] regions: [va7, nld2, aus5, can2, gbr9, ind2] models: ["_xdm.context.profile"] sources: ["scheduler"] ``` Important notes - Command/args override: Your Kubernetes work pool’s base job template must expose command and args as overridable variables for job_variables to take effect. If command/args aren’t currently overridable in your pool, edit the work pool in the UI (Work Pools > your pool > Edit base job template) to add templated fields for command/args and map them into the container’s spec. Docs: Customize job variables and Kubernetes work pools. - Why the venv works: We create the venv and then exec prefect flow-run execute using the venv’s PATH. Prefect’s pull steps (git clone + pip install) run inside that same process, so pip installs into the venv and the flow executes with the venv’s interpreter. - Keep Prefect out of your runtime requirements files: Don’t install or upgrade prefect in those requirements.txt files; pin Prefect in the image (Dockerfile) so the job runner and runtime match. - Image auth: If your registry requires credentials, configure imagePullSecrets in the work pool base job template or via job_variables. - Build/push prerequisites: Use the docker integration when deploying so the build/push steps work. Run deploy with Docker extras installed, e.g. via your tooling, or ensure your environment has
prefect[docker]
. References - Prefect deployment YAML and steps: prefect.yaml - Kubernetes worker setup and job variables: Kubernetes - Docker build/push steps: Docker If you share: - Your Kubernetes work pool’s base job template - The name of your registry and whether it needs imagePullSecrets - One flow’s exact requirements.txt path …I can tailor the job_variables (and base job template snippet) to your cluster and finalize the YAML.