Tushar Mahajan
10/03/2025, 5:53 PMfrom prefect_github.repository import GitHubRepository
github_repository_block = GitHubRepository.load("minerva-prefect-scripts")
hello_flow.deploy(
name="hello-k8s",
work_pool_name="default", # your Kubernetes work pool
version="1.0",
entrypoint="example_flow:hello_flow", # format: module:function
parameters={"name": "Prefect"},
storage=github_repository_block # GitHub repo block # run as Kubernetes job
)
But I am getting the error ->
ValueError: Either an image or remote storage location must be provided when deploying a deployment.
Can you please help me figure out what is the correct method to register a deployment for a flow code that lives in a github repoMarvin
10/03/2025, 5:53 PMKevin Grismore
10/03/2025, 5:57 PMhello_flow.from_source(
...
).deploy(
...
)
where from_source
is where you define the repository and deploy
is where you define everything elseMarvin
10/03/2025, 5:59 PMFlow.deploy
requires either:
- a remote source (i.e., the flow code can be fetched at run time), or
- a Docker image that already contains your code.
Your code is using a 2.x-style GitHubRepository
block via storage=...
, which 3.x no longer supports. That’s why you’re seeing:
ValueError: Either an image or remote storage location must be provided when deploying a deployment.
Here are the supported ways to deploy flows that live in GitHub.
Option A: Deploy directly from your GitHub repo (no image)
- Use Flow.from_source(...)
and pass an entrypoint in the “file.py:flow_func” format (path is relative to the repo root).
- Note: don’t use module notation like `example_flow:hello_flow`; it should be the file path, e.g., example_flow.py:hello_flow
.
Example (public repo):
from prefect.flows import Flow
remote_flow = Flow.from_source(
"<https://github.com/<owner>/<repo>>",
entrypoint="example_flow.py:hello_flow", # file.py:flow_func
)
remote_flow.deploy(
name="hello-k8s",
work_pool_name="default",
version="1.0",
parameters={"name": "Prefect"},
)
Private repo tips:
- The runner that executes your deployment must be able to clone your repo. Common patterns:
- Provide a GitHub token via environment variables on your Kubernetes work pool/worker (e.g., GITHUB_TOKEN
) and use HTTPS with a token in the URL, or configure SSH deploy keys via your pod/job template.
- If you prefer the CLI, the prefect.yaml
“pull” step supports a git_clone
step with an access_token
(see CLI example below).
- Docs: Store flow code for deployments (Prefect 3)
Option B: Build and use a Docker image
- Build an image that includes your code and dependencies, then pass image=
to deploy
.
Example:
from prefect.flows import Flow
from prefect.docker import DockerImage
# Build an image locally and push it (or set build=False if already pushed)
image = DockerImage(
name="<http://ghcr.io/<owner>/<image>:1.0.0|ghcr.io/<owner>/<image>:1.0.0>",
)
flow = Flow.from_source(
"<https://github.com/<owner>/<repo>>",
entrypoint="example_flow.py:hello_flow",
)
flow.deploy(
name="hello-k8s",
work_pool_name="default",
image=image, # satisfies the "image or remote source" requirement
version="1.0",
parameters={"name": "Prefect"},
)
CLI alternative (recommended for private repos)
- Prefect 3.x uses prefect deploy
(not prefect deployment build
).
- In prefect.yaml
, add a pull
step to clone your repo. You can reference a token from an environment variable or Prefect secret.
Example `prefect.yaml`:
name: my-project
pull:
- prefect.deployments.steps.set_working_directory:
directory: null
- prefect.deployments.steps.git_clone:
repository: <https://github.com/<owner>/<repo>.git>
branch: main
access_token: ${GITHUB_TOKEN} # make sure your worker has this env var
deployments:
- name: hello-k8s
entrypoint: example_flow.py:hello_flow
work_pool:
name: default
parameters:
name: Prefect
Then run:
prefect deploy
Useful docs:
- Store flow code for deployments: Prefect 3 docs: Store flow code
- Work pools (Kubernetes): Manage work pools
- CLI help verified: prefect deploy --help
and prefect work-pool create --help
A couple quick checks:
- Is your repo public or private?
- If private, how do you prefer to authenticate (HTTPS token vs SSH key)? I can tailor the exact config for your Kubernetes work pool/job template.Tushar Mahajan
10/03/2025, 6:23 PMfrom prefect.flows import Flow
from prefect_github.repository import GitHubRepository
github_repository_block = GitHubRepository.load("minerva-prefect-scripts")
remote_flow = Flow.from_source(
source=github_repository_block,
entrypoint="example_flow.py:hello_flow", # file.py:flow_func
)
remote_flow.deploy(
name="hello-k8s",
work_pool_name="Test",
version="1.0",
parameters={"name": "Prefect"},
)
got the error:
No block class found for slug 'github-repository'. Please ensure that the block class is available in the current environment.
Marvin
10/03/2025, 6:23 PMMarvin
10/03/2025, 6:25 PMGitHubRepository
is a 2.x-style block and it isn’t available/registered in Prefect 3.x. In 3.x, repository storage blocks are removed. Instead, you must either:
- point the deployment at a remote source via Flow.from_source(...)
, or
- provide a Docker image containing your code.
Since you switched to a local (process) work pool, here’s the minimal 3.x approach that deploys straight from your GitHub repo without blocks:
Public GitHub repo
from prefect.flows import Flow
remote_flow = Flow.from_source(
"<https://github.com/<owner>/<repo>>",
entrypoint="example_flow.py:hello_flow", # file.py:flow_function
)
remote_flow.deploy(
name="hello-local",
work_pool_name="Test",
version="1.0",
parameters={"name": "Prefect"},
)
Notes:
- The entrypoint must be a file path from the repo root, not a module path. For example, example_flow.py:hello_flow
, not example_flow:hello_flow
.
- Make sure a worker is running for your process pool:
prefect worker start --pool Test
Private GitHub repo
For private repos, the worker needs credentials to clone. Easiest is to use the CLI with a prefect.yaml
pull step and provide a token via env var:
prefect.yaml
name: my-project
pull:
- prefect.deployments.steps.set_working_directory:
directory: null
- prefect.deployments.steps.git_clone:
repository: <https://github.com/<owner>/<repo>.git>
branch: main
access_token: ${GITHUB_TOKEN}
deployments:
- name: hello-local
entrypoint: example_flow.py:hello_flow
work_pool:
name: Test
parameters:
name: Prefect
Then:
export GITHUB_TOKEN=<your_token> # make sure the worker inherits this
prefect deploy
prefect worker start --pool Test
Why your current code fails
- GitHubRepository.load(...)
refers to a 2.x block (slug github-repository
). In 3.x, that block doesn’t exist, hence “No block class found for slug ‘github-repository’.”
- Use Flow.from_source("<https://github.com/...>")
instead of storage=...
.
Docs for reference:
- Storing flow code for deployments: Store flow code (Prefect 3)
- Managing work pools: Manage work pools
If your repo is private, do you prefer HTTPS with a token or SSH deploy keys? I can share the exact configuration for your worker environment and job template.Tushar Mahajan
10/03/2025, 6:51 PMfrom prefect import flow
@flow
def hello_flow(name: str = "world"):
print(f"👋 Hello, {name}!")
return f"👋 Hello, {name}!"
Where can I see this output on the UI ?Marvin
10/03/2025, 6:51 PMMarvin
10/03/2025, 6:53 PM@flow(log_prints=True)
, or
- use the Prefect logger (get_run_logger().info(...)
), which always goes to logs.
- Make sure you’re running the flow as a Prefect flow run (e.g., via a deployment with a running worker). If you just run the script locally outside a Prefect run, logs won’t appear in the UI.
- Ensure your worker is configured to send logs to the API (generally on by default). If needed:
export PREFECT_LOGGING_TO_API=true
prefect worker start --pool Test
Updated flow example:
from prefect import flow, get_run_logger
@flow(log_prints=True)
def hello_flow(name: str = "world"):
# This will appear in the UI Logs when log_prints=True
print(f"👋 Hello, {name}!")
# This will appear in the UI Logs regardless of log_prints
logger = get_run_logger()
<http://logger.info|logger.info>(f"👋 Hello, {name}!")
return f"👋 Hello, {name}!"
UI navigation:
- Go to your Deployment → open a specific Flow Run → Logs tab.
- Or go to Flow Runs → select your run → Logs tab.
If you want a visual record besides logs, you can also create an artifact:
from prefect.artifacts import create_markdown_artifact
create_markdown_artifact(markdown=f"Result: 👋 Hello, {name}!")
Then view it on the run’s Artifacts tab.Tushar Mahajan
10/03/2025, 10:20 PMkubernetes_asyncio.config.kube_config - [Errno 2] No such file or directory: 'kubelogin'
Marvin
10/03/2025, 10:20 PMMarvin
10/03/2025, 10:23 PMkubelogin
binary is not present in the worker’s environment.
Why this happens
- Your kubeconfig likely has a user that uses exec
with kubelogin
(common for AKS AAD auth).
- The Prefect Kubernetes worker uses that kubeconfig to talk to the API server; without the kubelogin
binary in PATH, you get [Errno 2] No such file or directory: 'kubelogin'
.
Ways to fix it
Option A — Install kubelogin in the worker image (recommended if using kubeconfig/AAD)
- Add kubelogin to the Docker image your worker runs with. Example Dockerfile snippet (multi-arch):
FROM prefecthq/prefect:3-python3.11
ARG DEBIAN_FRONTEND=noninteractive
ARG KUBELOGIN_VERSION=v0.0.35
ARG TARGETARCH
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl unzip \
&& curl -L -o /tmp/kubelogin.zip <https://github.com/Azure/kubelogin/releases/download/${KUBELOGIN_VERSION}/kubelogin-linux-${TARGETARCH}.zip> \
&& unzip /tmp/kubelogin.zip -d /usr/local/bin \
&& chmod +x /usr/local/bin/kubelogin \
&& rm -rf /var/lib/apt/lists/* /tmp/*
- Use this image for your Kubernetes work pool jobs (set the image in your work pool’s base job template or when you deploy your flow).
Option B — Use in-cluster auth (no kubelogin needed)
- If your worker runs inside the same cluster, you can avoid kubeconfig/AAD entirely and use the pod’s service account.
- Ensure you are NOT mounting/pointing KUBECONFIG
in the worker pod.
- Give the worker’s service account RBAC to create Jobs/Pods in your target namespace. Minimal example:
apiVersion: v1
kind: ServiceAccount
metadata:
name: prefect-worker
namespace: <ns>
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
name: prefect-worker
namespace: <ns>
rules:
- apiGroups: ["", "batch"]
resources: ["pods", "pods/log", "pods/exec", "jobs"]
verbs: ["create", "get", "list", "watch", "delete"]
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
name: prefect-worker
namespace: <ns>
roleRef:
apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
kind: Role
name: prefect-worker
subjects:
- kind: ServiceAccount
name: prefect-worker
namespace: <ns>
Option C — If running the worker outside Kubernetes (on a VM/laptop)
- Install kubelogin on that machine so it’s available in PATH for the worker process:
- macOS: brew install Azure/kubelogin/kubelogin
- Windows: choco install kubelogin
- Linux: download the release zip for your arch and put kubelogin
in /usr/local/bin
- Kubelogin releases: Azure kubelogin releases
Quick checks
- Confirm your kubeconfig is using kubelogin:
- kubectl config view --minify -o jsonpath='{.users[0].user.exec.command}'
→ should show kubelogin
- Inside the worker environment:
- which kubelogin
and kubelogin --version
- Ensure KUBECONFIG
points to a file the worker can read (or is unset if using in-cluster auth)
Docs
- Prefect 3.x Kubernetes work pools and workers: Prefect 3.x: Kubernetes deployments and workers
A couple clarifying questions to make sure we point you to the best fix:
- Is your worker running inside the AKS cluster or outside it?
- Are you mounting/setting a kubeconfig for the worker that uses AAD exec with kubelogin?
- Which image is your worker using right now? If you share it, I can suggest the smallest change to add kubelogin.Tushar Mahajan
10/04/2025, 3:18 PMname: axia
prefect-version: 3.4.21
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.4.0
image_name: "{{ $PREFECT_IMAGE_NAME }}"
tag: latest-1
dockerfile: auto
platform: "linux/amd64"
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build-image.image_name }}"
tag: "{{ build-image.tag }}"
deployments
definitions:
work_pool:
name: "default"
job_variables:
image: "{{ build-image.image }}"
deployments:
- name: "default"
schedule: null
entrypoint: "flows/example_flow.py:hello_flow"
The image is working fine on local but on k8s, I am getting the following error ->
[Errno 2] No such file or directory: '/opt/prefect/axia'
08:46:07 PM
prefect.flow_runs
Unexpected exception encountered when trying to load flow
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 166, in run_steps
step_output = await run_step(step, upstream_outputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/steps/core.py", line 137, in run_step
result = await from_async.call_soon_in_new_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 374, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 399, in _run_sync
result = self.fn(*self.args, **self.kwargs)
Marvin
10/04/2025, 3:18 PMMarvin
10/04/2025, 3:25 PM[Errno 2] No such file or directory: '/opt/prefect/axia'
What that means
- In Prefect 3.x, when you deploy a project (using prefect.yaml), the runtime expects your project to be available inside the container at /opt/prefect/<project_name>
. Here, <project_name>
is the name
in your prefect.yaml, so /opt/prefect/axia
.
- With dockerfile: auto
, Prefect generates a Dockerfile that copies your project into the image so that path exists at runtime.
Why it works locally but fails on Kubernetes
The most common reasons the project directory is missing in K8s are:
1) The K8s pod is running an old image due to imagePullPolicy/caching
- You’re using a static tag latest-1
. K8s will reuse a cached image if imagePullPolicy
is IfNotPresent
(the default in many clusters), so the pod may not have your latest build that contains /opt/prefect/axia
.
- Fix: use unique tags per build and force pulls in your work pool/job variables.
2) A volume mount is shadowing /opt/prefect
- If your K8s job template mounts a volume at /opt/prefect
(or at /opt/prefect/axia
), it will hide the project files that were baked into the image.
- Fix: do not mount at /opt/prefect
. If you need a workspace, mount elsewhere (e.g., /opt/prefect/data
).
3) Build context or .dockerignore excluded files
- If your .dockerignore
excludes flows/
(or the entire project), the generated image will not contain your code even though it “built fine.”
- Fix: check .dockerignore
and confirm your flows/example_flow.py
is copied into the image.
What to change/check
- Use a unique tag for each build and force K8s to pull:
- Set tag
to something unique (timestamp, commit SHA).
- Set image_pull_policy: Always
in your work pool job variables.
- Optionally, persist the generated Dockerfile so you can see exactly what’s being copied:
- Add persist_dockerfile: true
and inspect Dockerfile.generated
.
Here’s a tightened-up prefect.yaml you can adapt:
name: axia
prefect-version: 3.4.21
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.4.0
image_name: "{{ $PREFECT_IMAGE_NAME }}"
tag: "{{ $.timestamp }}" # unique tag per build
dockerfile: auto
persist_dockerfile: true # optional: inspect generated Dockerfile
platform: "linux/amd64"
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build-image.image_name }}"
tag: "{{ build-image.tag }}"
definitions:
work_pool:
name: "default"
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always" # ensure fresh pull in K8s
deployments:
- name: "default"
schedule: null
entrypoint: "flows/example_flow.py:hello_flow"
work_pool: "{{ definitions.work_pool }}"
Quick diagnostics you can run:
- In the running pod, check the path:
- kubectl exec -it <pod> -- ls -la /opt/prefect
- kubectl exec -it <pod> -- ls -la /opt/prefect/axia
If it’s missing, it’s almost always due to image caching or a volume mount shadowing the directory.
- Verify what image and pull policy the worker is using (work pool job template). Make sure it matches your freshly pushed tag and has image_pull_policy: Always
.
- Check .dockerignore
for anything that might exclude flows/
or your project files.
If you can share:
- The work pool type (Kubernetes?) and its job variables/template (especially volumes and image_pull_policy)
- Whether you have any volume mounts at /opt/prefect
- The output of ls /opt/prefect
inside the pod
…I can pinpoint the exact cause.
References:
- prefect.yaml reference
- Kubernetes work pools and job variablesMarvin
10/04/2025, 3:25 PMTushar Mahajan
10/04/2025, 3:47 PMFROM prefecthq/prefect:3.4.21-python3.12
COPY . /opt/prefect/axia/
WORKDIR /opt/prefect/axia/
This is the generated dockerfile
and following is deployment inspect
tmahajan@Tushars-MacBook-Pro-9 axia % prefect deployment inspect hello-flow/default
{
'id': '86662f35-5d44-4f8c-9077-88f24af1eb95',
'created': '2025-10-04T15:09:10.393567Z',
'updated': '2025-10-04T15:47:10.127434Z',
'name': 'default',
'version': '9148bb5a',
'version_id': None,
'version_info': None,
'branch': None,
'base': None,
'root': None,
'description': None,
'flow_id': 'ed0a0a50-dfba-4c52-aadb-cb3a87f64748',
'concurrency_limit': None,
'global_concurrency_limit': None,
'concurrency_options': None,
'paused': False,
'schedules': [],
'job_variables': {},
'parameters': {},
'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/opt/prefect/axia'}}],
'tags': [],
'labels': {'prefect.flow.id': 'ed0a0a50-dfba-4c52-aadb-cb3a87f64748'},
'work_queue_name': 'default',
'last_polled': '2025-10-04T15:47:10.123610Z',
'parameter_openapi_schema': {
'type': 'object',
'title': 'Parameters',
'properties': {'name': {'type': 'string', 'title': 'name', 'default': 'world', 'position': 0}}
},
'path': None,
'entrypoint': 'flows/example_flow.py:hello_flow',
'storage_document_id': None,
'infrastructure_document_id': None,
'created_by': None,
'updated_by': None,
'work_queue_id': None,
'enforce_parameter_schema': True,
'work_pool_name': 'default',
'status': 'READY',
'automations': []
}
Marvin
10/04/2025, 3:48 PMMarvin
10/04/2025, 3:49 PM/opt/prefect/axia
and sets the workdir there:
FROM prefecthq/prefect:3.4.21-python3.12
COPY . /opt/prefect/axia/
WORKDIR /opt/prefect/axia/
- prefect deployment inspect
shows Prefect will set the working directory to /opt/prefect/axia
at runtime:
'pull_steps': [
{'prefect.deployments.steps.set_working_directory': {'directory': '/opt/prefect/axia'}}
]
Given that, the K8s error
[Errno 2] No such file or directory: '/opt/prefect/axia'
almost certainly means the container that ran in Kubernetes did not actually have your project files at that path. The two most common causes:
1) The K8s job is not using your built image
- Your deployment’s job_variables
are empty ('job_variables': {}
). Unless your work pool’s base job template has been updated to use your built image, the K8s job likely ran the default base image prefecthq/prefect:3.4.21-python3.12
— which does not include /opt/prefect/axia
.
- Action:
- Inspect the work pool to see what image it’s using:
prefect work-pool inspect default
- Also confirm the pod’s image:
kubectl describe pod <podname> | grep -i image
- If it’s not your built image, set the image in your deployment or work pool.
2) A volume mount is shadowing /opt/prefect
(less common, but worth checking)
- If your K8s job template mounts a volume at /opt/prefect
or /opt/prefect/axia
, it will hide the files baked into the image.
- Action:
- Check your pool’s base job template for volume mounts at /opt/prefect
.
- If needed, mount volumes elsewhere (e.g., /opt/prefect/data
).
How to fix in your prefect.yaml
Make sure your deployment attaches the image to the work pool job variables and force pulls in K8s. For example:
name: axia
prefect-version: 3.4.21
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.4.0
image_name: "{{ $PREFECT_IMAGE_NAME }}"
tag: "{{ $.timestamp }}" # unique tag per build to avoid cache
dockerfile: auto
persist_dockerfile: true # optional, for visibility
platform: "linux/amd64"
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build-image.image_name }}"
tag: "{{ build-image.tag }}"
deployments:
- name: "default"
entrypoint: "flows/example_flow.py:hello_flow"
work_pool:
name: "default"
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always"
Quick diagnostics
- Inside a running pod:
kubectl exec -it <pod> -- ls -la /opt/prefect
kubectl exec -it <pod> -- ls -la /opt/prefect/axia
If the directory is missing, you’re not running the correct image or it’s being shadowed.
- Verify you’re actually pulling the latest image (unique tag + image_pull_policy: Always
).
- Double-check .dockerignore
isn’t excluding your flows/
or other project files.
If you can share:
- Output of prefect work-pool inspect default
(especially the image and any volume mounts)
- The image printed by kubectl describe pod
for the flow-run pod
…I can pinpoint which of the two causes it is.
Docs:
- prefect.yaml reference
- Kubernetes work pools and job variablesTushar Mahajan
10/04/2025, 3:56 PMdeployments:
- name: "default"
schedule: null
entrypoint: "flows/example_flow.py:hello_flow"
work_pool:
name: "default"
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always"
imagePullSecrets:
- dxminerva_cloud_artifactory
but got error
message: 'initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t15-55-04-523025-00-00>:
unable to retrieve auth token: invalid username/password: unknown: Authentication
Marvin
10/04/2025, 3:57 PMMarvin
10/04/2025, 4:00 PMunable to retrieve auth token: invalid username/password: unknown: Authentication
- In your deployment, you set imagePullSecrets
in job_variables
, but the Kubernetes worker expects snake_case and a specific structure. With the current key/format, the pod likely isn’t getting any imagePullSecrets, so the pull fails.
How to fix
1) Use snake_case and the correct structure for image pull secrets:
- Prefect’s Kubernetes worker maps image_pull_secrets
to pod.spec.imagePullSecrets
. It must be a list of objects with a name
field.
- Update your deployment block like this:
deployments:
- name: "default"
schedule: null
entrypoint: "flows/example_flow.py:hello_flow"
work_pool:
name: "default"
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always"
image_pull_secrets:
- name: dxminerva-cloud-artifactory
Notes:
- Use image_pull_secrets
(snake_case), not imagePullSecrets
.
- The secret name must be valid K8s DNS-1123 (no underscores). If your actual secret uses underscores, recreate it with hyphens.
2) Ensure the secret exists in the same namespace as the flow-run pods and is the right type
- Create (or recreate) your secret as a docker registry secret:
kubectl create secret docker-registry dxminerva-cloud-artifactory \
--docker-server=<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com> \
--docker-username=<username> \
--docker-password=<password-or-api-key> \
--docker-email=<email> \
-n <namespace>
- Make sure the --docker-server
exactly matches the registry in your image URL.
- For Artifactory, confirm the correct username/API key format (some setups use a robot account or API key).
Alternative: bind the secret to a ServiceAccount and reference it
- Create a ServiceAccount with imagePullSecrets:
apiVersion: v1
kind: ServiceAccount
metadata:
name: prefect-worker
imagePullSecrets:
- name: dxminerva-cloud-artifactory
- Then set in job_variables:
service_account_name: prefect-worker
Verify it’s applied
- Check the effective work pool/job variables:
prefect work-pool inspect default
- Check the running pod has the secret:
kubectl get pod <pod> -o yaml | grep -A3 imagePullSecrets
- Confirm the image used by the pod and that the pull policy is Always:
kubectl describe pod <pod> | grep -i image
Also keep the unique image tag + Always pull
- Keep unique tags (e.g., timestamp) and image_pull_policy: "Always"
to avoid stale cached images.
Docs
- Kubernetes work pools and job variables: Kubernetes work pools guide
- prefect.yaml reference: prefect.yaml reference
If you share:
- The namespace you’re running in
- The output of prefect work-pool inspect default
- kubectl get secret dxminerva-cloud-artifactory -o yaml -n <namespace>
(redact data)
…I can confirm the mapping and whether K8s is seeing the secret correctly.Tushar Mahajan
10/04/2025, 5:19 PMapiVersion: v1
data:
.dockerconfigjson: ****
kind: Secret
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"v1","data":{".dockerconfigjson":"***"},"kind":"Secret","metadata":{"annotations":{},"name":"dxminerva-cloud-artifactory","namespace":"ns-team-dx-minerva-prod-va7"},"type":"kubernetes.io/dockerconfigjson"}
creationTimestamp: "2025-10-02T10:28:02Z"
name: dxminerva-cloud-artifactory
namespace: ns-team-dx-minerva-prod-va7
resourceVersion: "5340990357"
uid: a2523252-a63d-41b2-8c52-7aa93fc9529f
type: kubernetes.io/dockerconfigjson
Prefect.yaml
name: axia
prefect-version: 3.4.21
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
requires: prefect-docker>=0.4.0
image_name: "{{ $PREFECT_IMAGE_NAME }}"
tag: "{{ $.timestamp }}" # unique tag per build
dockerfile: auto
persist_dockerfile: true # optional: inspect generated Dockerfile
platform: "linux/amd64"
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build-image.image_name }}"
tag: "{{ build-image.tag }}"
definitions:
work_pool:
name: "default"
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always" # ensure fresh pull in K8s
deployments:
- name: "default"
schedule: null
entrypoint: "flows/example_flow.py:hello_flow"
work_pool:
name: "default"
job_variables:
image: "{{ build-image.image }}"
image_pull_policy: "Always"
image_pull_secrets:
- name: dxminerva-cloud-artifactory
Error ->
state:
waiting:
message: 'Back-off pulling image "docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00":
ErrImagePull: initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>:
unable to retrieve auth token: invalid username/password: unknown: Authentication
Marvin
10/04/2025, 5:19 PMMarvin
10/04/2025, 5:20 PMimage_pull_secrets
. Since you’re still getting “invalid username/password”, here are the most common causes and how to pinpoint the exact issue.
What to verify first
1) The flow-run pod is in the same namespace as the secret
- Your secret is in ns-team-dx-minerva-prod-va7
. If your flow-run pod is in a different namespace, it cannot use that secret.
- Ensure your deployment sets the namespace:
work_pool:
name: "default"
job_variables:
namespace: ns-team-dx-minerva-prod-va7
image: "{{ build-image.image }}"
image_pull_policy: "Always"
image_pull_secrets:
- name: dxminerva-cloud-artifactory
- Confirm the namespace of the flow-run pod:
kubectl get pods -A | grep <flow-run-id-or-deployment-name>
2) The pod actually has imagePullSecrets attached
- Inspect the pod YAML:
kubectl get pod <pod-name> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
- You should see:
imagePullSecrets:
- name: dxminerva-cloud-artifactory
If it’s missing, the worker isn’t applying your job_variables. In that case:
- Confirm your work pool type is Kubernetes:
prefect work-pool inspect default
Look for something like "type": "kubernetes"
. If it isn’t, your deployment won’t create K8s Jobs.
- If it is Kubernetes but still not applying, you can bake imagePullSecrets
into the work pool’s base job template as a fallback.
3) The dockerconfigjson matches the image registry host exactly
- Decode and inspect the secret:
kubectl get secret dxminerva-cloud-artifactory -n ns-team-dx-minerva-prod-va7 \
-o jsonpath='{.data.\.dockerconfigjson}' | base64 -d
- In the output, under auths
, the key must match the host of your image exactly:
- Your image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello|docker-minerva-release.dr-uw2.adobeitc.com/hello>:...
- Required auths key: "<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com>"
(no scheme, no path)
- If it doesn’t match, recreate the secret with the exact server:
kubectl delete secret dxminerva-cloud-artifactory -n ns-team-dx-minerva-prod-va7
kubectl create secret docker-registry dxminerva-cloud-artifactory \
--docker-server=<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com> \
--docker-username='<username>' \
--docker-password='<password-or-api-key>' \
--docker-email='<email>' \
-n ns-team-dx-minerva-prod-va7
Notes for Artifactory:
- If using a robot account, the username may need to be robot$<name>
.
- API keys/personal access tokens are commonly used as the password.
4) Credentials are actually valid for that repo
- Try a simple test pull outside Prefect using the same secret:
- Create a test Pod in the same namespace that uses the same image and `imagePullSecrets`:
apiVersion: v1
kind: Pod
metadata:
name: pull-test
namespace: ns-team-dx-minerva-prod-va7
spec:
imagePullSecrets:
- name: dxminerva-cloud-artifactory
containers:
- name: test
image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
command: ["sleep","3600"]
restartPolicy: Never
Then:
kubectl apply -f pull-test.yaml
kubectl describe pod pull-test -n ns-team-dx-minerva-prod-va7 | grep -i image
kubectl get pod pull-test -n ns-team-dx-minerva-prod-va7
- If this test pod also fails with invalid username/password, the credentials/registry mapping are the issue (not Prefect). Recreate the secret as above and/or verify the account can pull that repository.
5) Ensure you’re actually using a Kubernetes work pool and the right image
- Verify:
prefect work-pool inspect default
- Confirm "type": "kubernetes"
.
- Confirm the pod’s image:
```Marvin
10/04/2025, 5:20 PMIt should be the unique tag you built and pushed. Keep using unique tags and `image_pull_policy: "Always"`.
If you prefer attaching the secret via ServiceAccount
- Create a ServiceAccount with the secret:
apiVersion: v1
kind: ServiceAccount
metadata:
name: prefect-worker
namespace: ns-team-dx-minerva-prod-va7
imagePullSecrets:
- name: dxminerva-cloud-artifactory
- Then set in job_variables:
service_account_name: prefect-worker
namespace: ns-team-dx-minerva-prod-va7
```
If it still fails, please share:
- prefect work-pool inspect default
- kubectl get pod <pod> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
- The decoded .dockerconfigjson
“auths” keys (redact credentials)
That will let me pinpoint whether it’s a namespace mismatch, missing imagePullSecrets on the pod, or a registry credential/server mismatch.
References:
- Kubernetes work pools and job variables
- prefect.yaml referenceTushar Mahajan
10/04/2025, 5:24 PM(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx --context ethos44-prod-va7 -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
No image pull secrets
inspect output
tmahajan@Tushars-MacBook-Pro-9 axia % prefect work-pool inspect default
WorkPool(
id='5c7f6f71-6f86-4f85-a15a-e1546bc7bf87',
created=DateTime(2025, 10, 2, 17, 31, 19, 185796, tzinfo=Timezone('UTC')),
updated=DateTime(2025, 10, 4, 17, 16, 27, 43205, tzinfo=Timezone('UTC')),
name='default',
type='kubernetes',
base_job_template={
'variables': {
'type': 'object',
'properties': {
'env': {
'type': 'object',
'title': 'Environment Variables',
'description': 'Environment variables to set when starting a flow run.',
'additionalProperties': {'anyOf': [{'type': 'string'}, {'type': 'null'}]}
},
'name': {
'anyOf': [{'type': 'string'}, {'type': 'null'}],
'title': 'Name',
'description': 'Name given to infrastructure created by a worker.'
},
'image': {
'anyOf': [{'type': 'string'}, {'type': 'null'}],
'title': 'Image',
'examples': ['<http://docker.io/prefecthq/prefect:3-latest|docker.io/prefecthq/prefect:3-latest>'],
'description': 'The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be
used.'
},
'labels': {
'type': 'object',
'title': 'Labels',
'description': 'Labels applied to infrastructure created by a worker.',
'additionalProperties': {'type': 'string'}
},
'command': {
'anyOf': [{'type': 'string'}, {'type': 'null'}],
'title': 'Command',
'description': 'The command to use when starting a flow run. In most cases, this should be left blank and the command will be
automatically generated by the worker.'
},
'namespace': {
'type': 'string',
'title': 'Namespace',
'default': 'ns-team-dx-minerva-prod-va7',
'description': 'The Kubernetes namespace to create jobs within.'
},
'backoff_limit': {
'type': 'integer',
'title': 'Backoff Limit',
'default': 0,
'minimum': 0,
'description': 'The number of times Kubernetes will retry a job after pod eviction. If set to 0, Prefect will reschedule the flow
run when the pod is evicted.'
},
'stream_output': {
'type': 'boolean',
'title': 'Stream Output',
'default': True,
'description': 'If set, output will be streamed from the job to local standard output.'
},
'cluster_config': {
'anyOf': [{'$ref': '#/definitions/KubernetesClusterConfig'}, {'type': 'null'}],
'description': 'The Kubernetes cluster config to use for job creation.'
},
'finished_job_ttl': {
'anyOf': [{'type': 'integer'}, {'type': 'null'}],
'title': 'Finished Job TTL',
'default': 60,
'description': 'The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after
the given delay. If not set, jobs will be retained indefinitely.'
},
'image_pull_policy': {
'enum': ['IfNotPresent', 'Always', 'Never'],
'type': 'string',
'title': 'Image Pull Policy',
'default': 'IfNotPresent',
'description': 'The Kubernetes image pull policy to use for job containers.'
},
'service_account_name': {
'anyOf': [{'type': 'string'}, {'type': 'null'}],
'title': 'Service Account Name',
'description': 'The Kubernetes service account to use for job creation.'
},
'job_watch_timeout_seconds': {
'anyOf': [{'type': 'integer'}, {'type': 'null'}],
'title': 'Job Watch Timeout Seconds',
'description': 'Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for
each event indefinitely.'
},
'pod_watch_timeout_seconds': {
'type': 'integer',
'title': 'Pod Watch Timeout Seconds',
'default': 60,
'description': 'Number of seconds to watch for pod creation before timing out.'
}
},
'definitions': {
'KubernetesClusterConfig': {
'type': 'object',
'title': 'KubernetesClusterConfig',
'required': ['config', 'context_name'],
'properties': {
'config': {
'type': 'object',
'title': 'Config',
'description': 'The entire contents of a kubectl config file.',
'additionalProperties': True
},
'context_name': {'type': 'string', 'title': 'Context Name', 'description': 'The name of the kubectl context to use.'}
},
'description': 'Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.',
'secret_fields': [],
'block_type_slug': 'kubernetes-cluster-config',
'block_schema_references': {}
}
},
'description': 'Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of
the default\nbase job template.'
},
'job_configuration': {
'env': '{{ env }}',
'name': '{{ name }}',
'labels': '{{ labels }}',
'command': '{{ command }}',
'namespace': '{{ namespace }}',
'job_manifest': {
'kind': 'Job',
'spec': {
'template': {
'spec': {
'containers': [
{
'env': '{{ env }}',
'args': '{{ command }}',
'name': 'prefect-job',
'image': '{{ image }}',
'imagePullPolicy': '{{ image_pull_policy }}'
}
],
'completions': 1,
'parallelism': 1,
'restartPolicy': 'Never',
'serviceAccountName': '{{ service_account_name }}'
}
},
'backoffLimit': '{{ backoff_limit }}',
'ttlSecondsAfterFinished': '{{ finished_job_ttl }}'
},
'metadata': {'labels': '{{ labels }}', 'namespace': '{{ namespace }}', 'generateName': '{{ name }}-'},
'apiVersion': 'batch/v1'
},
'stream_output': '{{ stream_output }}',
'cluster_config': '{{ cluster_config }}',
'job_watch_timeout_seconds': '{{ job_watch_timeout_seconds }}',
'pod_watch_timeout_seconds': '{{ pod_watch_timeout_seconds }}'
}
},
status=WorkPoolStatus.READY,
storage_configuration=WorkPoolStorageConfiguration(),
default_queue_id='6007965a-e625-42fd-8284-c311e607c5cb'
)
Image grep:
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx --context ethos44-prod-va7 -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 image
image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
imagePullPolicy: Always
name: prefect-job
resources:
limits:
--
- image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
imageID: ""
lastState: {}
name: prefect-job
ready: false
--
message: 'Back-off pulling image "<http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>":
ErrImagePull: initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>:
unable to retrieve auth token: invalid username/password: unknown: Authentication
is required'
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
@MarvinMarvin
10/04/2025, 5:24 PM(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx --context ethos44-prod-va7 -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets (sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
No image pull secrets inspect output ```tmahajan@Tushars-MacBook-Pro-9 axia % prefect work-pool inspect default WorkPool( id='5c7f6f71-6f86-4f85-a15a-e1546bc7bf87' created=DateTime(2025 10 2 17 31 19 185796 tzinfo=Timezone('UTC')) updated=DateTime(2025 10 4 17 16 27 43205 tzinfo=Timezone('UTC')) name='default' type='kubernetes' base_job_template={ 'variables' { 'type' 'object' 'properties' { 'env' { 'type' 'object' 'title' 'Environment Variables' 'description' 'Environment variables to set when starting a flow run ' 'additionalProperties' {'anyOf' [{'type' 'string'} {'type' 'null'}]} } 'name' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Name' 'description' 'Name given to infrastructure created by a worker ' } 'image' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Image' 'examples' ['<http //docker io/prefecthq/prefect 3-latest|docker io/prefecthq/prefect 3-latest>'] 'description' 'The image reference of a container image to use for created jobs If not set the latest Prefect image will be used ' } 'labels' { 'type' 'object' 'title' 'Labels' 'description' 'Labels applied to infrastructure created by a worker ' 'additionalProperties' {'type' 'string'} } 'command' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Command' 'description' 'The command to use when starting a flow run In most cases this should be left blank and the command will be automatically generated by the worker ' } 'namespace' { 'type' 'string' 'title' 'Namespace' 'default' 'ns-team-dx-minerva-prod-va7' 'description' 'The Kubernetes namespace to create jobs within ' } 'backoff_limit' { 'type' 'integer' 'title' 'Backoff Limit' 'default' 0 'minimum' 0 'description' 'The number of times Kubernetes will retry a job after pod eviction If set to 0 Prefect will reschedule the flow run when the pod is evicted ' } 'stream_output' { 'type' 'boolean' 'title' 'Stream Output' 'default' True 'description' 'If set output will be streamed from the job to local standard output ' } 'cluster_config' { 'anyOf' [{'$ref' '#/definitions/KubernetesClusterConfig'} {'type' 'null'}] 'description' 'The Kubernetes cluster config to use for job creation ' } 'finished_job_ttl' { 'anyOf' [{'type' 'integer'} {'type' 'null'}] 'title' 'Finished Job TTL' 'default' 60 'description' 'The number of seconds to retain jobs after completion If set finished jobs will be cleaned up by Kubernetes after the given delay If not set jobs will be retained indefinitely ' } 'image_pull_policy' { 'enum' ['IfNotPresent' 'Always' 'Never'] 'type' 'string' 'title' 'Image Pull Policy' 'default' 'IfNotPresent' 'description' 'The Kubernetes image pull policy to use for job containers ' } 'service_account_name' { 'anyOf' [{'type' 'string'} {'type' 'null'}] 'title' 'Service Account Name' 'description' 'The Kubernetes service account to use for job creation ' } 'job_watch_timeout_seconds' { 'anyOf' [{'type' 'integer'} {'type' 'null'}] 'title' 'Job Watch Timeout Seconds' 'description' 'Number of seconds to wait for each event emitted by a job before timing out If not set the worker will wait for each event indefinitely ' } 'pod_watch_timeout_seconds' { 'type' 'integer' 'title' 'Pod Watch Timeout Seconds' 'default' 60 'description' 'Number of seconds to watch for pod creation before timing out ' } } 'definitions' { 'KubernetesClusterConfig' { 'type' 'object' 'title' 'KubernetesClusterConfig' 'required' ['config' 'context_name'] 'properties' { 'config' { 'type' 'object' 'title' 'Config' 'description' 'The entire contents of aTushar Mahajan
10/04/2025, 5:25 PM(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx --context ethos44-prod-va7 -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
No image pull secrets
Image grep:
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents % kubectl get pod lavender-vole-z6f8c-b2vkx --context ethos44-prod-va7 -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 image
image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
imagePullPolicy: Always
name: prefect-job
resources:
limits:
--
- image: <http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>
imageID: ""
lastState: {}
name: prefect-job
ready: false
--
message: 'Back-off pulling image "<http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>":
ErrImagePull: initializing source <docker://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t16-55-59-936069-00-00>:
unable to retrieve auth token: invalid username/password: unknown: Authentication
is required'
(sshuttle) tmahajan@Tushars-MacBook-Pro-9 Documents %
Marvin
10/04/2025, 5:25 PMMarvin
10/04/2025, 5:26 PMimage_pull_secrets
set, but the pod YAML does not include imagePullSecrets
. That means the job variables didn’t make it through to the Kubernetes Job/Pod spec used by the worker.
- Common causes:
- The work pool is not a Kubernetes pool, so K8s-only fields are ignored.
- The key/structure doesn’t match what the Kubernetes worker expects.
- The deployment wasn’t re-created after editing prefect.yaml.
- The field is set at deployment, but the pool’s base job template is overriding/ignoring it.
Do this to fix and verify
1) Confirm your work pool is Kubernetes and see its template
prefect work-pool inspect default
- Ensure "type": "kubernetes"
.
- If it’s not Kubernetes, create a new Kubernetes pool and target that pool in your deployment.
2) Set namespace explicitly and try image_pull_secrets as strings
Some versions expect a list of strings (names), not objects. Also set the namespace.
deployments:
- name: "default"
entrypoint: "flows/example_flow.py:hello_flow"
work_pool:
name: "default"
job_variables:
namespace: ns-team-dx-minerva-prod-va7
image: "{{ build-image.image }}"
image_pull_policy: "Always"
image_pull_secrets:
- dxminerva-cloud-artifactory
Then redeploy:
prefect deploy
Verify the deployment has job variables:
prefect deployment inspect hello-flow/default
You should see those keys under job_variables
.
3) Verify the pod includes imagePullSecrets
After a new run starts:
kubectl get pod <pod> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 imagePullSecrets
You should see:
imagePullSecrets:
- name: dxminerva-cloud-artifactory
4) If it still doesn’t surface, bind the secret via a ServiceAccount (robust approach)
Create a ServiceAccount in the same namespace:
apiVersion: v1
kind: ServiceAccount
metadata:
name: prefect-worker
namespace: ns-team-dx-minerva-prod-va7
imagePullSecrets:
- name: dxminerva-cloud-artifactory
Then set in job variables:
service_account_name: prefect-worker
namespace: ns-team-dx-minerva-prod-va7
Redeploy and confirm the pod’s spec.serviceAccountName
is prefect-worker
.
5) Double-check the registry host in the secret
Decode and ensure the .dockerconfigjson
“auths” key exactly matches your image registry host:
- Image host: <http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com>
- Secret should contain:
"auths": {
"<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com>": { ... }
}
If not, recreate:
kubectl delete secret dxminerva-cloud-artifactory -n ns-team-dx-minerva-prod-va7
kubectl create secret docker-registry dxminerva-cloud-artifactory \
--docker-server=<http://docker-minerva-release.dr-uw2.adobeitc.com|docker-minerva-release.dr-uw2.adobeitc.com> \
--docker-username='<username>' \
--docker-password='<password-or-api-key>' \
--docker-email='<email>' \
-n ns-team-dx-minerva-prod-va7
6) Optional: bake into the work pool base job template
If you prefer it as the pool default, edit the work pool’s base job template in the UI to include imagePullSecrets
and/or serviceAccountName
, or update via CLI with --base-job-template
. This ensures all deployments on the pool inherit the pull secret.
Once imagePullSecrets
is present on the pod and the auth matches the registry host, the pull should succeed.
Refs:
- Kubernetes work pools and job variables
- prefect.yaml reference
If it still doesn’t attach, please share:
- prefect work-pool inspect default
Marvin
10/04/2025, 5:26 PMprefect deployment inspect hello-flow/default
(confirm job_variables present)
- kubectl get pod <pod> -n ns-team-dx-minerva-prod-va7 -o yaml | grep -A3 -i imagePullSecrets
Tushar Mahajan
10/04/2025, 5:34 PM'job_variables': {
'image': '<http://docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t17-30-02-640215-00-00|docker-minerva-release.dr-uw2.adobeitc.com/hello:2025-10-04t17-30-02-640215-00-00>',
'namespace': 'ns-team-dx-minerva-prod-va7',
'image_pull_policy': 'Always',
'image_pull_secrets': ['dxminerva-cloud-artifactory']
},
Still same errorMarvin
10/04/2025, 5:34 PMMarvin
10/04/2025, 5:35 PMapiVersion: v1
kind: ServiceAccount
metadata:
name: prefect-worker
namespace: ns-team-dx-minerva-prod-va7
imagePullSecrets:
- name: dxminerva-cloud-artifactory
2) Point your deployment at this SA (and namespace):
work_pool:
name: "default"
job_variables:
namespace: ns-team-dx-minerva-prod-va7
service_account_name: prefect-worker
image: "{{ build-image.image }}"
image_pull_policy: "Always"
3) Redeploy (prefect deploy
) and confirm the pod has serviceAccountName: prefect-worker
and can pull the image.
Option B: Add imagePullSecrets to the work pool’s base job template
1) Export the default job template:
prefect work-pool get-default-base-job-template --type kubernetes --file k8s-template.json
2) Edit k8s-template.json, adding imagePullSecrets to the pod spec:
- Find: job_manifest.spec.template.spec
- Add this line (comma separate appropriately):
"imagePullSecrets": "{{ image_pull_secrets }}",
Full shape (context):
"spec": {
"completions": 1,
"containers": [
{
"args": "{{ command }}",
"env": "{{ env }}",
"image": "{{ image }}",
"imagePullPolicy": "{{ image_pull_policy }}",
"name": "prefect-job"
}
],
"parallelism": 1,
"restartPolicy": "Never",
"serviceAccountName": "{{ service_account_name }}",
"imagePullSecrets": "{{ image_pull_secrets }}"
}
3) Update the work pool to use this template:
prefect work-pool update default --base-job-template k8s-template.json
4) Keep your deployment job_variables as you have them:
job_variables:
namespace: ns-team-dx-minerva-prod-va7
image: "{{ build-image.image }}"
image_pull_policy: "Always"
image_pull_secrets:
- name: dxminerva-cloud-artifactory
5) Redeploy and verify the pod YAML now shows:
spec:
imagePullSecrets:
- name: dxminerva-cloud-artifactory
Also verify
- Work pool type is Kubernetes:
prefect work-pool inspect default
- The flow-run pod is in the ns-team-dx-minerva-prod-va7
namespace.
- The secret’s dockerconfigjson “auths” key matches the exact registry host:
- Image host: docker-minerva-release.dr-uw2.adobeitc.com
- Secret auths must include: "docker-minerva-release.dr-uw2.adobeitc.com"
Once the pod has imagePullSecrets (or the SA approach), the pull error should resolve. If it persists, it’s almost always a registry host mismatch in the secret or bad credentials.
References:
- Kubernetes work pools and job variables
- prefect.yaml referenceTushar Mahajan
10/04/2025, 6:10 PMname: axia
prefect-version: 3.4.21
pull:
- prefect.deployments.steps.git_clone:
id: clone-step # needed to be referenced in subsequent steps
repository: <https://github.com/OneAdobe/minerva.git>
branch: main
access_token: "{{ prefect.blocks.secret.deployment-default-hello-flow-repo-token }}"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone-step.directory }}" # `clone-step` is a user-provided `id` field
requirements_file: requirements.txt
deployments:
- name: "default"
schedule: null
entrypoint: "example_flow.py:hello_flow"
work_pool:
name: "default"
Marvin
10/04/2025, 6:10 PMMarvin
10/04/2025, 6:11 PMgit_clone
+ pip_install_requirements
. The key is to put the pull steps on each deployment and set the working directory to the flow’s subdirectory so you can target that flow’s own requirements.txt
.
Here’s a pattern you can use for multiple flows in the same repo, each with its own requirements:
name: axia
prefect-version: 3.4.21
deployments:
- name: hello-flow
entrypoint: "example_flow.py:hello_flow" # relative to the working directory below
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://github.com/OneAdobe/minerva.git>
branch: main
access_token: "{{ prefect.blocks.secret.deployment-default-hello-flow-repo-token }}"
# Optional: limit checkout size
# directories:
# - flows/hello-flow
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}/flows/hello-flow"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone.directory }}/flows/hello-flow"
requirements_file: requirements.txt
work_pool:
name: default
job_variables:
namespace: ns-team-dx-minerva-prod-va7
image: "<your-runtime-image>"
image_pull_policy: "Always"
# If using a ServiceAccount for pulls, also:
# service_account_name: prefect-worker
- name: other-flow
entrypoint: "example_flow.py:other_flow"
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://github.com/OneAdobe/minerva.git>
branch: main
access_token: "{{ prefect.blocks.secret.deployment-default-hello-flow-repo-token }}"
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}/flows/other-flow"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone.directory }}/flows/other-flow"
requirements_file: requirements.txt
work_pool:
name: default
job_variables:
namespace: ns-team-dx-minerva-prod-va7
image: "<your-runtime-image>"
image_pull_policy: "Always"
Notes and tips:
- git_clone
returns directory
(the clone path). Use set_working_directory
to point Prefect to the specific subdir for the flow so your entrypoint
and requirements.txt
are relative and simple.
- You can keep requirements_file: requirements.txt
if you set directory
to the flow’s folder; otherwise you can point requirements_file
to a subpath from the repo root.
- Make sure your runtime image has pip
(and git
if you’re cloning at runtime). If you’re using a custom image, set it via job_variables.image
.
- If your repo is private, the access_token
reference is fine using a Secret block. Alternatively, you can set credentials
to a block that holds git creds.
- This installs at run time (per flow run). If the dependencies are stable, consider baking them into the image for faster, reproducible runs.
Docs:
- prefect.yaml reference
- Kubernetes work pools and job variablesTushar Mahajan
10/04/2025, 6:17 PMMarvin
10/04/2025, 6:17 PMMarvin
10/04/2025, 6:18 PMprefect work-pool get-default-base-job-template --type kubernetes --file k8s-template.json
- Edit k8s-template.json: add activeDeadlineSeconds under job_manifest.spec
"spec": {
"backoffLimit": 0,
"activeDeadlineSeconds": 600, // e.g., 10 minutes
"template": {
"spec": {
...
- Update the work pool:
prefect work-pool update <your-pool-name> --base-job-template k8s-template.json
- Keep your existing finished_job_ttl (Prefect variable) so once the Job is marked Failed by the deadline, TTL removes it.
2) Clean up stuck Jobs periodically (CronJob or script)
- If you just need a one-off cleanup for ErrImagePull/ImagePullBackOff, you can delete Jobs whose Pods are in those states. Example using jq:
NS=ns-team-dx-minerva-prod-va7
kubectl get pods -n "$NS" -o json \
| jq -r '.items[]
| select(.status.phase=="Pending")
| select(.status.containerStatuses[0].state.waiting.reason=="ImagePullBackOff"
or .status.containerStatuses[0].state.waiting.reason=="ErrImagePull")
| .metadata.ownerReferences[]
| select(.kind=="Job")
| .name' \
| sort -u \
| xargs -r -n1 kubectl delete job -n "$NS"
- To make future cleanup easier, add a label via job_variables and target that label:
job_variables:
labels:
app: prefect
Then you can scope to -l app=prefect
in your cleanup.
3) Fix the root cause so these don’t accumulate
- Ensure image pull secrets are applied (serviceAccountName or image_pull_secrets in a base job template that actually wires them into pod.spec.imagePullSecrets
).
- Consider setting service_account_name
in job_variables and attaching the pull secret to that ServiceAccount; it’s robust and avoids per-deployment wiring.
Key notes
- finished_job_ttl only applies after the Job is Complete or Failed; pending ImagePullBackOff Jobs won’t be deleted by TTL.
- activeDeadlineSeconds lives at job.spec (not the pod template) and will force the Job to Failed after the deadline.
- backoffLimit=0 prevents retries but does not mark a pending ImagePullBackOff as Failed; you need the active deadline or manual cleanup.
References
- Prefect Kubernetes work pools and job variables
- Kubernetes Job termination and TTLTushar Mahajan
10/04/2025, 7:39 PMMarvin
10/04/2025, 7:39 PMMarvin
10/04/2025, 7:40 PMgit_clone
+ set_working_directory
+ pip_install_requirements
- One shared runtime image for many flows; code fetched at run time
Pros
- Fast iteration: deploy code changes without rebuilding images
- Fewer images to manage for many flows (good for monorepos)
- Smaller registry footprint; avoids duplicate deps across many images
- Easy to parameterize per-flow directories and per-flow requirements
Cons
- Cold start latency: clone + pip install during each run (can be minutes for heavy deps)
- Requires cluster egress to Git and valid tokens; more secret management at runtime
- Reproducibility varies: pip resolve can drift unless pinned and cached
- Pod image must include git and build tools (C compilers for native deps)
- Harder to “airgap” (no network) clusters
Mitigations
- Pin versions and use constraints files; consider wheels cache
- Pre-bake heavy deps in a base image; pip_install_requirements
just adds small flow-specific deps
- Mount a pip cache (emptyDir) and set PIP_CACHE_DIR
- Use a ServiceAccount for git (if using Git over K8s-private infra) or store token in a Secret block
- Ensure your runtime image has git and build tooling
Approach B: Build/push Docker images (per flow or per group)
What it looks like
- Use build_docker_image
+ push_docker_image
steps in prefect.yaml
- Bake code and deps into immutable images; worker only pulls and runs
Pros
- Fast, predictable startup; no runtime install
- Strong reproducibility/auditability (tagged images, SBOM/scanning)
- Works without cluster egress to Git/ PyPI
- Avoids runtime git tokens; less secret sprawl in pods
- Easier to meet regulated environment requirements
Cons
- More CI/CD complexity; need build pipeline and registry auth/imagePullSecrets
- More images to build/manage for many flows unless you share an image
- Registry storage and image churn if you tag per commit
- Fixes require an image rebuild/push cycle
Mitigations
- Use a shared base image for common deps; build slim per-flow layers (copy code only)
- Multi-stage builds and caching (buildx) to speed CI
- Consolidate flows that share deps into shared images
- Enforce unique tags and image_pull_policy: Always
to avoid stale pods
- Attach image pull secrets via ServiceAccount or base job template
When to choose which
- Choose Runtime git clone + pip when:
- You have many flows in a monorepo and want minimal image sprawl
- You push frequent code changes and want ultra-fast deploys without builds
- Dependencies are light or you can pre-bake heavy ones into a base image
- Choose Docker images when:
- You need reproducibility, fast cold-starts, offline/egress-restricted clusters
- Dependencies are heavy/compiled; you want predictable build-time failures
- Security/compliance requires image scanning and immutable artifacts
Hybrid pattern (often best for many teams)
- Build a “platform” base image with Python + git + system deps + heavy Python libs (pandas, numpy, db drivers)
- For flows:
- Option 1: Runtime git_clone
+ pip_install_requirements
for small per-flow deps only
- Option 2: Build slim per-flow images that copy just code on top of the base
- This gives fast startup, fewer images than fully-per-flow, and still flexible iteration
Operational tips for K8s work pools
- If using image pulls: bind a ServiceAccount with imagePullSecrets
(more reliable than per-deployment injection)
- Add activeDeadlineSeconds
in the pool’s base job template so “stuck” Jobs fail and TTL can clean them
- Use labels in job_variables
for easy cleanup/debugging
- Ensure namespace
, service_account_name
, and image_pull_policy
are set in job_variables
DocsMarvin
10/04/2025, 7:40 PMTushar Mahajan
10/05/2025, 10:44 AMMarvin
10/05/2025, 10:44 AMMarvin
10/05/2025, 10:46 AMpip_install_requirements
only accepts a requirements.txt. But you can use Poetry in two good ways:
Option 1: Export from Poetry and use pip_install_requirements (simple, no Poetry needed at runtime)
- Export a requirements file from your pyproject/lock, then let Prefect’s pip step install it.
- Works even if your runtime image doesn’t have Poetry preinstalled.
Example:
deployments:
- name: hello-flow
entrypoint: "example_flow.py:hello_flow"
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://github.com/your-org/your-repo.git>
branch: main
access_token: "{{ prefect.blocks.secret.repo-token }}"
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}/flows/hello-flow"
- prefect.deployments.steps.run_shell_script:
directory: "{{ clone.directory }}/flows/hello-flow"
script: |
# requires Poetry available in PATH where this runs; if not, see Option 2 or bake into image
poetry export -f requirements.txt --without-hashes -o .prefect-reqs.txt
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone.directory }}/flows/hello-flow"
requirements_file: .prefect-reqs.txt
work_pool:
name: default
Tip: If your runtime image doesn’t have Poetry, you can export in CI and commit or attach the exported file (e.g., .prefect-reqs.txt
) to your repo, then skip the export step at runtime.
Option 2: Run Poetry install at runtime (requires Poetry in the image or install it first)
- Install Poetry (if needed) and install deps into the container’s environment. Disable virtualenv creation so packages land in the container’s Python.
deployments:
- name: hello-flow
entrypoint: "example_flow.py:hello_flow"
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://github.com/your-org/your-repo.git>
branch: main
access_token: "{{ prefect.blocks.secret.repo-token }}"
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}/flows/hello-flow"
- prefect.deployments.steps.run_shell_script:
directory: "{{ clone.directory }}/flows/hello-flow"
script: |
# If Poetry is not present, install it; otherwise, skip
python -m pip install --no-cache-dir poetry==1.8.3
poetry config virtualenvs.create false
poetry install --no-interaction --no-ansi --only main
work_pool:
name: default
Notes:
- The image must have build tools for any native deps (gcc, musl-dev, etc.) if needed.
- For reproducibility, always commit and use a lock file (poetry.lock).
Option 3: Bake Poetry deps into the image (fastest and most reproducible)
- Use a custom Dockerfile in your build step that installs from pyproject/poetry.lock.
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
image_name: "{{ $PREFECT_IMAGE_NAME }}"
tag: "{{ $.timestamp }}"
dockerfile: Dockerfile # use your custom Dockerfile below
# Dockerfile
# ---
# FROM prefecthq/prefect:3.4.21-python3.12
# WORKDIR /opt/prefect/axia
# RUN python -m pip install --no-cache-dir poetry==1.8.3
# COPY pyproject.toml poetry.lock ./
# RUN poetry config virtualenvs.create false \
# && poetry install --no-interaction --no-ansi --only main
# COPY . .
# ENV PYTHONPATH=/opt/prefect/axia
- Pro: very fast startup, deterministic, no runtime network required to PyPI.
- Con: requires CI/CD to build/push images per change.
Which to use?
- Many flows, frequent code-only updates, light deps: Option 1 (export + pip) is a nice balance.
- Locked-down/air-gapped clusters or heavy deps: Option 3 (bake into image).
- If you truly want Poetry at runtime, Option 2 works; just ensure Poetry is installed in the image or install it at run time.
References:Marvin
10/05/2025, 10:46 AMTushar Mahajan
10/05/2025, 1:37 PMname: common
prefect-version: 3.4.21
build:
pull:
- prefect.deployments.steps.git_clone:
id: clone-step
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table"
requirements_file: requirements.txt
deployments:
- name: "enrich_ims_org_info_table"
schedule: null
entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
work_pool:
name: "Local Worker"
I have written entry point relative to the code cloned from git, but when i try to deploy, it searches for flow code in local directories and fails the deploymentMarvin
10/05/2025, 1:37 PMMarvin
10/05/2025, 1:40 PMentrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
- Keep your runtime pull steps to clone/install in the pod:
deployments:
- name: "enrich_ims_org_info_table"
entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
requirements_file: requirements.txt
Option B — Clone during “build” for deploy-time, and also clone during “pull” for run-time
- Use a build step to clone the repo for the deploy command, then reference its output in entrypoint. Keep pull steps for runtime.
- Example:
name: common
prefect-version: 3.4.21
build:
- prefect.deployments.steps.git_clone:
id: clone_for_deploy
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
deployments:
- name: "enrich_ims_org_info_table"
entrypoint: "{{ clone_for_deploy.directory }}/flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone.directory }}/flows/common/enrich_ims_org_info_table"
requirements_file: requirements.txt
work_pool:
name: "Local Worker"
- This ensures the flow exists at deploy time (so the CLI can import it) and at run time.
Option C — Recommended programmatic deployment: flow.from_source(...)
- Skip YAML for this case and deploy directly from Git without having the code locally:
from prefect.blocks.system import Secret
from prefect import flow
token = Secret.load("dxminerva-git-pat").get()
deployment = flow.from_source(
source="<https://github.com/OneAdobe/minerva-prefect.git>",
entrypoint="flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table",
access_token=token,
).deploy(
name="enrich_ims_org_info_table",
work_pool_name="Local Worker",
)
- This is the cleanest for “remote source” deployments in Prefect 3.x.
Why this is necessary
- prefect deploy imports your flow to build parameter schema and metadata. Since pull steps don’t run at deploy time, the CLI must find the entrypoint locally (Options A/B) or you must deploy programmatically from the source (Option C).
Docs
- Prefect YAML reference: prefect.yaml referenceMarvin
10/05/2025, 1:40 PMTushar Mahajan
10/05/2025, 8:25 PMMarvin
10/05/2025, 8:25 PMMarvin
10/05/2025, 8:26 PM# auth.py
import requests
def get_token(client_id, client_secret):
r = <http://requests.post|requests.post>("<https://auth.example.com/oauth/token>", data={
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials",
})
r.raise_for_status()
data = r.json()
return data["access_token"], int(data.get("expires_in", 3600))
# flows.py
from prefect import flow, task
from prefect.blocks.system import Secret
from auth import get_token
@task
def fetch_job(job_id: str, token: str):
import requests
r = requests.get(f"<https://api.example.com/jobs/{job_id}>",
headers={"Authorization": f"Bearer {token}"})
r.raise_for_status()
return r.json()
@flow
def my_flow(job_ids: list[str]):
creds = Secret.load("api-client-credentials").get()
token, _ = get_token(creds["client_id"], creds["client_secret"])
results = [fetch_job.submit(j, token) for j in job_ids]
return [r.result() for r in results]
2) Use a module-level singleton with TTL (shared within the flow-run process)
- Prefect’s default task runner is thread-based; a module-level cache is shared by tasks in the same flow run process. This avoids passing tokens to every task, while still generating once and refreshing on expiry. Add a lock for thread-safety.
# auth.py
import time, threading, requests
from typing import Optional, Tuple
from prefect.blocks.system import Secret
_lock = threading.Lock()
_token: Optional[str] = None
_expiry_ts: float = 0.0
def _fetch_token() -> Tuple[str, float]:
creds = Secret.load("api-client-credentials").get()
r = <http://requests.post|requests.post>("<https://auth.example.com/oauth/token>", data={
"client_id": creds["client_id"],
"client_secret": creds["client_secret"],
"grant_type": "client_credentials",
})
r.raise_for_status()
data = r.json()
token = data["access_token"]
ttl = int(data.get("expires_in", 3600))
return token, time.monotonic() + max(0, ttl - 30) # refresh 30s early
def get_token() -> str:
global _token, _expiry_ts
now = time.monotonic()
with _lock:
if _token is None or now >= _expiry_ts:
_token, _expiry_ts = _fetch_token()
return _token
Use from any module:
# tasks.py
from prefect import task
from auth import get_token
import requests
@task
def fetch_job(job_id: str):
token = get_token()
r = requests.get(f"<https://api.example.com/jobs/{job_id}>",
headers={"Authorization": f"Bearer {token}"})
r.raise_for_status()
return r.json()
Notes and caveats
- Thread vs process runners: The module-level cache works great with the default thread-based runner. If you switch to a process-based runner (or distributed executors), each process will have its own cache and may fetch its own token. Passing tokens explicitly (pattern 1) is most robust across execution backends.
- Token security: Avoid storing tokens in Prefect result caches. Use blocks (e.g., Secret) for credentials and keep tokens in memory only.
- Expiry: Implement a TTL/refresh window to avoid using an expired token mid-flow.
- Multiple concurrent flows: Each flow run is typically a separate process/pod; a module-level cache is per-process, so flows don’t share tokens with each other (usually desired).Marvin
10/05/2025, 8:26 PMTushar Mahajan
10/12/2025, 6:44 AM@task(name="enrich_default_merge_policy")
def enrich_default_merge_policy(row: Dict) -> Dict:
logger = get_run_logger()
row = row.copy()
env = row.get("env")
region = row.get("region")
ims_org_id = row.get("ims_org_id")
sandbox_name = row.get("sandbox_name")
scan_job_id = row.get("scan_job_id")
model = row.get("model")
row.update(MERGE_POLICY_FIELDS)
try:
ims_api_client = get_ims_api_client()
logos_api_client = get_logos_api_client(ims_api_client)
resp = logos_api_client.get_merge_policy(environment=env, region=region, ims_org_id=ims_org_id,
sandbox_name=sandbox_name,
schema=model, default=True, client_id="acp_core_unifiedProfile_feeds",
limit=1)
if resp.status_code != 200:
logger.warning(f"Job {scan_job_id}: got non-200 ({resp.status_code}) while fetching default merge policy")
return row
default_merge_policy = resp.json()
default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])
row["default_merge_policy_id"] = default_merge_policy_id
return row
except Exception as e:
logger.error(f"Error enriching default merge policy id for job: {scan_job_id}: {e}")
return row
Marvin
10/12/2025, 6:44 AMMarvin
10/12/2025, 6:50 AMfrom typing import Dict, List
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
# Tag this task so we can use a concurrency limit
@task(
name="enrich_default_merge_policy",
retries=3,
retry_delay_seconds=5,
timeout_seconds=30,
tags=["merge-policy"] # used for rate limiting
)
def enrich_default_merge_policy(row: Dict) -> Dict:
logger = get_run_logger()
r = row.copy()
env = r.get("env")
region = r.get("region")
ims_org_id = r.get("ims_org_id")
sandbox_name = r.get("sandbox_name")
scan_job_id = r.get("scan_job_id")
model = r.get("model")
r.update(MERGE_POLICY_FIELDS)
# Let errors raise so retries happen
ims_api_client = get_ims_api_client()
logos_api_client = get_logos_api_client(ims_api_client)
resp = logos_api_client.get_merge_policy(
environment=env,
region=region,
ims_org_id=ims_org_id,
sandbox_name=sandbox_name,
schema=model,
default=True,
client_id="acp_core_unifiedProfile_feeds",
limit=1,
)
if resp.status_code != 200:
logger.warning(f"Job {scan_job_id}: got non-200 ({resp.status_code}) while fetching default merge policy")
return r # consider raising if you want retries on non-200 too
default_merge_policy = resp.json()
default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])
r["default_merge_policy_id"] = default_merge_policy_id
return r
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict]) -> List[Dict]:
futures = [enrich_default_merge_policy.submit(row) for row in rows]
return [f.result() for f in futures]
Enforce a safe concurrency limit on the tag
- Create a limit (e.g., only 10 tasks with tag “merge-policy” can run at once):
prefect concurrency-limit create merge-policy 10
- Inspect/update as needed:
prefect concurrency-limit ls
prefect concurrency-limit inspect merge-policy
prefect concurrency-limit reset merge-policy
When to batch/chunk
- 3k tasks is typically fine. If your orchestration overhead or memory footprint becomes a concern, chunk into batches (e.g., 100–300 rows per batch) and submit one task per batch. You’ll lose per-row retries/visibility but reduce orchestration overhead.
- Example batch pattern:
def chunk(seq, size):
for i in range(0, len(seq), size):
yield seq[i:i+size]
@task(tags=["merge-policy"])
def enrich_batch(batch: List[Dict]) -> List[Dict]:
# If APIs are strict, keep this sequential inside the batch
return [enrich_default_merge_policy.fn(r) for r in batch]
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def enrich_all_batched(rows: List[Dict], batch_size: int = 200) -> List[Dict]:
futures = [enrich_batch.submit(b) for b in chunk(rows, batch_size)]
batches = [f.result() for f in futures]
return [r for batch in batches for r in batch]
A few practical tips
- Let exceptions raise to trigger retries (log and re-raise). If you catch and return, Prefect will mark the task Successful.Marvin
10/12/2025, 6:50 AMTushar Mahajan
10/12/2025, 7:05 AMMarvin
10/12/2025, 7:05 AMMarvin
10/12/2025, 7:06 AMretries=0
and rely entirely on the client’s retry logic, or
- Use a very small number of Prefect retries (e.g., 1) with a retry_condition_fn
that only retries on non-HTTP/network/infrastructure issues.
- Set a task timeout slightly above the worst-case total time of the client’s internal retries, so Prefect doesn’t cancel valid long-running retries.
- Use tag-based concurrency limits to respect the upstream API’s rate. Keep a reasonable max_workers
on the task runner.
Example (minimal Prefect retry, rely on client)
from typing import Dict, List
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
@task(
name="enrich_default_merge_policy",
retries=0,
timeout_seconds=60, # set > max total time your client can spend retrying
tags=["merge-policy"]
)
def enrich_default_merge_policy(row: Dict) -> Dict:
logger = get_run_logger()
r = row.copy()
env = r.get("env")
region = r.get("region")
ims_org_id = r.get("ims_org_id")
sandbox_name = r.get("sandbox_name")
scan_job_id = r.get("scan_job_id")
model = r.get("model")
r.update(MERGE_POLICY_FIELDS)
# Let the client handle retry/backoff; let exceptions bubble if it ultimately fails
ims_api_client = get_ims_api_client()
logos_api_client = get_logos_api_client(ims_api_client)
resp = logos_api_client.get_merge_policy(
environment=env,
region=region,
ims_org_id=ims_org_id,
sandbox_name=sandbox_name,
schema=model,
default=True,
client_id="acp_core_unifiedProfile_feeds",
limit=1,
)
if resp.status_code != 200:
logger.warning(f"Job {scan_job_id}: non-200 ({resp.status_code}) fetching default merge policy")
# Option A: return best-effort
return r
# Option B (preferred for data quality): raise to mark failure and review/re-run
# raise RuntimeError(f"Non-200 from API: {resp.status_code}")
default_merge_policy = resp.json()
default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])
r["default_merge_policy_id"] = default_merge_policy_id
return r
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict]) -> List[Dict]:
futures = [enrich_default_merge_policy.submit(row) for row in rows]
return [f.result() for f in futures]
Optional: conditional Prefect retries (guardrails only)
- Use this if you want a single “second chance” for infra/transient errors that your client might not cover (e.g., worker hiccups).
def should_retry(exc: Exception) -> bool:
# Retry only on transient/infra signals, not on HTTP status-based failures
transient = (TimeoutError, ConnectionError) # extend with your client’s exceptions
return isinstance(exc, transient)
@task(
name="enrich_default_merge_policy",
retries=1,
retry_delay_seconds=30,
retry_condition_fn=should_retry,
timeout_seconds=60,
tags=["merge-policy"],
)
def enrich_default_merge_policy(row: Dict) -> Dict:
...
Rate limiting via tags
- Cap concurrent tasks hitting the API so you don’t exceed limits (independent of your client’s retry/backoff).
prefect concurrency-limit create merge-policy 10
prefect concurrency-limit ls
prefect concurrency-limit inspect merge-policy
A couple of tweaks to your current code
- Avoid broad try/except
that returns a “successful” row on failure; it hides real errors and blocks retries. If you want to keep the log, catch, log, then raise
to let the task fail and be visible/retryable.Marvin
10/12/2025, 7:06 AMtimeout_seconds
with your client’s retry envelope. For example, if the client can take up to ~45s on worst-case backoff, set timeout_seconds
to ~60–90s.
If you can share your API’s rate limits and your client’s max retry/backoff profile, I can suggest concrete values for max_workers
, the tag concurrency limit, and timeout_seconds
.Tushar Mahajan
10/12/2025, 3:17 PMMarvin
10/12/2025, 3:17 PMMarvin
10/12/2025, 3:18 PMfrom typing import Dict, List, Iterable
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
def chunk(seq: List[Dict], size: int) -> Iterable[List[Dict]]:
for i in range(0, len(seq), size):
yield seq[i:i+size]
@task(name="enrich_default_merge_policy", tags=["merge-policy"], retries=0, timeout_seconds=90)
def enrich_default_merge_policy(row: Dict) -> Dict:
# your existing logic (let client handle retries; let final exceptions raise if needed)
...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict], batch_size: int = 200) -> List[Dict]:
results: List[Dict] = []
for batch in chunk(rows, batch_size):
futures = [enrich_default_merge_policy.submit(r) for r in batch]
results.extend(f.result() for f in futures) # drain this batch before moving on
return results
Notes
- Start with batch_size=100–300 and max_workers=10–30. Tune based on memory and API latency.
- Tag concurrency limit keeps you within API limits, but batching is what prevents the local process from holding 8k futures. Create/adjust if needed:
- Create: prefect concurrency-limit create merge-policy 10
- Inspect/list: prefect concurrency-limit inspect merge-policy
/ prefect concurrency-limit ls
- If results are large, consider returning only needed fields and/or persist_result=False
on the task to reduce storage overhead.
- If you still see pressure, move batches into subflows so each batch is its own flow run:
@flow
def process_batch(rows: List[Dict]) -> List[Dict]:
futures = [enrich_default_merge_policy.submit(r) for r in rows]
return [f.result() for f in futures]
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def enrich_all(rows: List[Dict], batch_size: int = 200) -> List[Dict]:
results = []
for b in chunk(rows, batch_size):
results.extend(process_batch(b))
return results
If you can share where this is running (local machine vs worker, CPU/RAM) and typical API latency, I can suggest concrete values for batch_size, max_workers, and the concurrency limit.Tushar Mahajan
10/12/2025, 3:32 PMMarvin
10/12/2025, 3:32 PMMarvin
10/12/2025, 3:33 PM@task(persist_result=False)
def enrich_default_merge_policy(...): ...
- You can also set it at the flow level to apply to all tasks:
@flow(persist_result=False)
def enrich_all(...): ...
- Avoid keeping large results in memory:
- If a task’s return value isn’t used downstream, turn off in-memory caching:
@task(persist_result=False, cache_result_in_memory=False)
def enrich_default_merge_policy(...): ...
- Return small payloads:
- Only return the fields you need (e.g., a small dict with row id and the new value).
- If you need to store full rows, write them to your own storage (S3, DB) inside the task and just return a small reference (path/id).
- Keep logs concise:
- Logs are stored per task; avoid logging full rows or large payloads.
- Use warnings/errors sparingly; consider summary logs at batch boundaries.
- Persist only the final aggregated result if needed:
- Keep per-row tasks ephemeral; write/return a compact list and persist just once at the end (or skip persisting entirely).
Example pattern
@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def enrich_default_merge_policy(row: dict) -> dict:
# compute small result: only id + enriched field
...
@flow(persist_result=False)
def enrich_all(rows: list[dict], batch_size: int = 200) -> list[dict]:
enriched = []
for batch in chunk(rows, batch_size):
futures = [enrich_default_merge_policy.submit(r) for r in batch]
# collect small results then optionally write once at the end
enriched.extend(f.result() for f in futures)
# optionally persist final output (to S3/DB) here
return enriched # or return None to avoid storing any large result
If you’re still concerned about storage or UI overhead with many tasks:
- Use batching (100–300 per batch) to limit in-flight futures.
- Consider subflows per batch to split orchestration records across runs.
- Tune logging down to essentials.
If you share how large each row/result is and whether you need to persist final outputs, I can suggest exact task/flow settings and an output-writing pattern that keeps storage usage minimal.Tushar Mahajan
10/12/2025, 3:34 PMMarvin
10/12/2025, 3:34 PMMarvin
10/12/2025, 3:35 PMfrom typing import List, Dict
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
_DATASET: List[Dict] = [] # module-level cache; same process only
@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def enrich_row(i: int) -> Dict:
row = _DATASET[i] # small param: just the index
# ... your existing enrichment logic ...
# return a small result or write to external storage and return a reference
return {"id": row["id"], "default_merge_policy_id": "..."} # keep it small
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def enrich_all(dataset_uri: str, batch_size: int = 200) -> List[Dict]:
global _DATASET
_DATASET = load_rows(dataset_uri) # load once; do not pass rows as a param
results: List[Dict] = []
for start in range(0, len(_DATASET), batch_size):
end = min(start + batch_size, len(_DATASET))
futures = [enrich_row.submit(i) for i in range(start, end)]
results.extend(f.result() for f in futures)
return results # or write to storage and return nothing
Example: pointer-per-batch (works with any runner/worker)
@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def process_batch(dataset_uri: str, start: int, end: int) -> int:
# Load just the slice from external storage
rows = stream_rows(dataset_uri, start, end) # implement streaming/partial load
# Option A: process sequentially inside the task to avoid per-row params
# Option B: use plain threads here (not Prefect tasks) for small local parallelism
count = 0
for row in rows:
# ... enrichment logic ...
# write output to S3/DB; avoid returning large data
count += 1
return count # small summary
@flow(persist_result=False)
def enrich_all(dataset_uri: str, batch_size: int = 500) -> int:
# Only small params are passed: uri + indices
totals = []
size = get_dataset_size(dataset_uri)
for start in range(0, size, batch_size):
end = min(start + batch_size, size)
totals.append(process_batch.submit(dataset_uri, start, end))
return sum(t.result() for t in totals)
Other tips to keep storage light
- Results
- Use persist_result=False
and cache_result_in_memory=False
on tasks; @flow(persist_result=False)
at the flow level.
- Return only small payloads (IDs, counts). Write large outputs to your own storage and return a reference.
- Logging
- Don’t log full rows. Keep logs concise.
- Concurrency
- Keep batching to avoid thousands of in-flight futures.
- Use tag-based concurrency limits to respect API limits, but remember they don’t reduce parameter size—batching and reference-passing do.
If you can share where the data lives today (file/DB/S3) and how you trigger the flow, I can tailor a pattern (pointer-per-row vs indices-only vs batch references) that fits your runtime and avoids parameter bloat.Tushar Mahajan
10/12/2025, 5:30 PMMarvin
10/12/2025, 5:30 PMMarvin
10/12/2025, 5:31 PMfrom concurrent.futures import ThreadPoolExecutor
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
def enrich_fn(row: dict) -> dict:
# your non-task function
...
@task(persist_result=False, cache_result_in_memory=False)
def enrich_batch(rows: list[dict], workers: int = 20) -> list[dict]:
with ThreadPoolExecutor(max_workers=workers) as ex:
return list(ex.map(enrich_fn, rows))
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def enrich_all(rows_uri: str) -> int:
rows = load_rows(rows_uri) # load by reference to avoid big params
results = enrich_batch(rows) # single Prefect task
write_results(results) # or return a small summary
return len(results)
Example: Prefect-level concurrency (per-row tasks)
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task(persist_result=False, cache_result_in_memory=False, tags=["merge-policy"])
def enrich_row(row: dict) -> dict:
return enrich_fn(row)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def enrich_all(rows: list[dict], batch_size: int = 200) -> list[dict]:
out = []
for i in range(0, len(rows), batch_size):
batch = rows[i:i+batch_size]
futures = [enrich_row.submit(r) for r in batch]
out.extend(f.result() for f in futures)
return out
Rule of thumb:
- Need Prefect retries/visibility/rate limits per item? Use @task + .submit.
- Want minimal Prefect metadata/storage and are okay with less visibility? Do local threads inside a single @task.Tushar Mahajan
10/12/2025, 8:04 PMMarvin
10/12/2025, 8:04 PMMarvin
10/12/2025, 8:05 PMTushar Mahajan
10/12/2025, 8:08 PM@flow(name="get-export-job-stats", log_prints=True)
def get_export_job_stats(
envs: List[str],
regions: List[str],
models: List[str],
sources: List[str],
target_date: Optional[datetime] = None
) -> None:
logger = get_run_logger()
date, start_ms, end_ms = query_date_window_ms(target_date)
logger.info(f"Time window: start_ms={start_ms}, end_ms={end_ms}")
for env in envs:
all_export_jobs_env: List[Dict] = []
# Step 1: Fetch all export jobs in parallel
fetch_futures: List = []
for region in regions:
for model in models:
for source in sources:
fetch_futures.append(
query_export_jobs_from_psql.submit(
env=env,
region=region,
model=model,
source=source,
start_time_ms=start_ms,
end_time_ms=end_ms,
date=date
)
)
wait(fetch_futures)
# Step 2: Collect results
for f in fetch_futures:
try:
jobs = f.result()
all_export_jobs_env.extend(jobs)
except Exception as e:
logger.warning(f"Failed to fetch jobs for env={env}: {e}")
logger.info(f"Fetched total {len(all_export_jobs_env)} jobs for env={env}")
_set_global_rows(all_export_jobs_env)
# Step 2: Filter commercial jobs (mutates global dataset)
filter_commercial_export_jobs(env)
# Step 3-6: Run enrichment stages under subflows that use their own ThreadPoolTaskRunner
enrich_default_merge_policy_flow()
_GLOBAL_ROWS: List[Dict] = []
def _set_global_rows(rows: List[Dict]):
global _GLOBAL_ROWS
_GLOBAL_ROWS = rows
def enrich_default_merge_policy(row: Dict) -> Dict:
logger = get_logger(__name__)
row = row.copy()
env = row.get("env")
region = row.get("region")
ims_org_id = row.get("ims_org_id")
sandbox_name = row.get("sandbox_name")
scan_job_id = row.get("scan_job_id")
model = row.get("model")
row.update(MERGE_POLICY_FIELDS)
try:
ims_api_client = get_ims_api_client()
logos_api_client = get_logos_api_client(ims_api_client)
resp = logos_api_client.get_merge_policy(environment=env, region=region, ims_org_id=ims_org_id,
sandbox_name=sandbox_name,
schema=model, default="true",
client_id="acp_core_unifiedProfile_feeds",
limit=1)
if resp.status_code != 200:
logger.warning(f"Job {scan_job_id}: got non-200 ({resp.status_code}) while fetching default merge policy")
return row
default_merge_policy = resp.json()
default_merge_policy_id = safe_get(default_merge_policy["children"][0], ["id"])
row["default_merge_policy_id"] = default_merge_policy_id
return row
except Exception as e:
logger.error(f"Error enriching default merge policy id for job: {scan_job_id}: {e}")
return row
@flow(name="enrich_default_merge_policy_flow")
def enrich_default_merge_policy_flow(max_workers: int = 30) -> None:
global _GLOBAL_ROWS
total = len(_GLOBAL_ROWS)
if total == 0:
return
_set_global_rows(populate_data_concurrently(enrich_default_merge_policy, _GLOBAL_ROWS, max_workers=max_workers))
@Marvin This is my flow code, do you see any improvements from prefect framework pov
Updated the codeMarvin
10/12/2025, 8:09 PMMarvin
10/12/2025, 8:11 PMfrom itertools import product
from prefect import flow, task, get_run_logger
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner
@task(tags=["psql"], persist_result=False, cache_result_in_memory=False, timeout_seconds=60)
def query_export_jobs_from_psql(...)-> list[dict]:
...
def chunked(iterable, size):
it = iter(iterable)
while True:
batch = list([next(it) for _ in range(size) if True])
if not batch:
break
yield batch
@flow(name="get-export-job-stats", log_prints=True, task_runner=ThreadPoolTaskRunner(max_workers=20), persist_result=False)
def get_export_job_stats(envs, regions, models, sources, target_date=None) -> None:
logger = get_run_logger()
date, start_ms, end_ms = query_date_window_ms(target_date)
<http://logger.info|logger.info>(f"Time window: start_ms={start_ms}, end_ms={end_ms}")
combo_iter = ((env, r, m, s) for env in envs for r, m, s in product(regions, models, sources))
for env, combos_batch in group_by_env(combo_iter): # or loop envs outermost as you do
all_export_jobs_env = []
# Submit in batches to keep futures bounded (e.g., 200 at a time)
for batch in chunked(combos_batch, 200):
futures = [
query_export_jobs_from_psql.submit(
env=env, region=r, model=m, source=s,
start_time_ms=start_ms, end_time_ms=end_ms, date=date
)
for (_, r, m, s) in batch
]
for fut in as_completed(futures):
try:
all_export_jobs_env.extend(fut.result())
except Exception as e:
logger.warning(f"Failed to fetch jobs for env={env}: {e}")
<http://logger.info|logger.info>(f"Fetched total {len(all_export_jobs_env)} jobs for env={env}")
...
Notes:
- as_completed lets you drain results as they finish and reduces the chance of “hangs” from too many pending futures.
- Tune max_workers and batch size based on DB latency and machine size.
2) Rate limit with tags (DB and API)
- Tag your tasks and set server-side concurrency limits so you don’t overload Postgres or the external API:
prefect concurrency-limit create psql 10
prefect concurrency-limit create merge-policy 10
prefect concurrency-limit ls
prefect concurrency-limit inspect merge-policy
Then add tags to tasks, e.g., @task(tags=["merge-policy"])
on your enrichment task.
3) Avoid globals or at least constrain their use
- Using a module-level _GLOBAL_ROWS
only works reliably if everything runs in a single Python process (ThreadPoolTaskRunner). It will break with process pools, distributed workers, or if you ever submit subflows elsewhere.
- Safer alternatives:
- Keep enrichment in the same flow and pass indices to a task that reads from a module-level cache loaded once (same-process only).
- Or write the fetched rows to external storage (S3/DB/temp file) and pass a pointer (URI + index range) into subflows/tasks.
- If you keep the global pattern, strongly prefer ThreadPoolTaskRunner everywhere these rows are accessed.
4) Choose one of these enrichment patterns
- Minimal Prefect overhead (recommended if you had “hangs” with many tasks):
- Use a single batch task that runs local threads for per-row enrichment using your plain Python enrich_default_merge_policy
function. This keeps Prefect task count small while still parallelizing I/O.
```Marvin
10/12/2025, 8:11 PMenrich_default_merge_policy
as a Prefect task and submit in batches to keep futures bounded. Also set persist_result=False
and cache_result_in_memory=False
.
5) Keep storage light
- Since you’re careful about not passing large rows as parameters, also explicitly set:
- @flow(persist_result=False)
on flows
- @task(persist_result=False, cache_result_in_memory=False)
on tasks
- Return only small payloads or write large results to your own storage and return a small reference.
6) Timeouts and logging
- Add timeout_seconds
to tasks (slightly above the worst-case client retry time) so stuck calls don’t block threads forever.
- Use Prefect’s get_run_logger()
inside tasks/subflows for consistent logs in the UI. If you keep plain functions, pass the message up or log at the task wrapper level.
7) Batching everywhere
- You already batch by env. Also batch fetch submissions and enrichment submissions to avoid thousands of in-flight futures and UI overwhelm.
If you can share:
- Typical DB/API latencies and rate limits
- Where this runs (local vs worker; CPU/RAM)
I can suggest concrete values for max_workers, batch sizes, and concurrency limits, and help refactor the enrichment step into the batch pattern that keeps Prefect metadata lean while preserving throughput.Tushar Mahajan
10/12/2025, 8:19 PMMarvin
10/12/2025, 8:19 PMMarvin
10/12/2025, 8:22 PMprefect deploy
and flow.from_source(...).deploy(...)
): Search: prefect deploy + from_source
What to watch out for (version gotchas)
- Prefect 2.x-only patterns you should avoid in 3.x:
- Deployment.build_from_flow()
(removed in 3.x)
- prefect deployment build
CLI (replaced by prefect deploy
)
- GitHub storage blocks (use flow.from_source('<https://github.com/...>')
instead)
- When studying a repo, check its README and pyproject.toml/requirements.txt
for prefect>=3
to confirm applicability.
If you tell me your stack (cloud provider, DB/warehouse, container/orchestrator) and preferred patterns (per-row tasks vs batches, workers vs local), I can point you to the most relevant example repos and specific files/directories to study.Tushar Mahajan
10/14/2025, 12:05 PMMarvin
10/20/2025, 10:48 AMMarvin
10/20/2025, 10:53 AMsplunklib
because pip installed it into a different location than the interpreter that executes your task. When you retry, the environment/path resolution ends up finding the package (or the first failure completes installation into a location your interpreter later sees).
Key clues from your logs:
- WARNING: The script prefect is installed in '/home/prefect/.local/bin' which is not on PATH
- You are installing packages at runtime (pull step), including prefect itself
- ModuleNotFoundError("No module named 'splunklib'")
Common root causes
- Mismatch between the pip used to install and the Python interpreter used to run the flow (e.g., pip targets one Python, the flow runs with another).
- Pip fell back to user installs (~/.local
) and your runtime interpreter is not reading user site-packages (common if running in a virtualenv or different user).
- Installing Prefect at runtime can cause version drift and environment inconsistencies between worker and flow subprocess.
What to do
Pick one of these stable patterns:
1) Process work pool: pre-provision a virtual environment and stop installing at runtime
- Create and activate a venv for your worker host.
- pip install your requirements (including splunk-sdk) into that venv.
- Start your Prefect worker from that venv.
- Remove any runtime pip-install pull steps. This guarantees the flow subprocess inherits the same interpreter with all deps available.
2) If you must install at runtime, ensure pip uses the exact interpreter running your flow
- Replace any “pip install …” with “python -m pip install …” so it installs into the same interpreter.
- Prefer a venv you control over user-site installs. For example, in your pull steps:
pull:
- run_shell_script:
id: setup_venv
script: |
python -m venv .venv
. .venv/bin/activate
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
- set_working_directory:
directory: .
Also make sure the process worker uses this venv to execute the flow (e.g., start the worker from within the venv).
3) Docker/Kubernetes work pools: bake deps into an image
- Build a Docker image with splunk-sdk and your other packages.
- Reference that image in your deployment so every run has a consistent environment.
- This is the most reliable approach for scheduled production runs.
Important tips
- Don’t install “prefect” during pull steps. Pin Prefect in the worker/ image instead so worker and flow match. Installing Prefect at runtime can break the worker process or lead to subtle import/path issues.
- Pin your package versions in requirements to avoid daily drift.
- If you see the “.local/bin is not on PATH” warning, it means pip used user-site. Prefer a venv to avoid this, or set up PATH and PYTHONPATH appropriately (but venv is cleaner).
Quick diagnostic task you can add to confirm what Python and site-packages your flow is using:
from prefect import task, flow
import sys, site, subprocess
@task
def debug_env():
print("sys.executable:", sys.executable)
print("sys.version:", sys.version)
print("site.getsitepackages:", getattr(site, "getsitepackages", lambda: [])())
print("site.getusersitepackages:", site.getusersitepackages())
subprocess.run([sys.executable, "-m", "pip", "show", "splunk-sdk"])
@flow
def debug_flow():
debug_env()
If you can share:
- Your work pool type (process, docker, kubernetes)
- The pull section of your prefect.yaml (or how you’re installing deps at runtime)
- The Python version used by the worker
…I can suggest an exact config to make this rock solid.
References
- Deployments and pull steps overview: DeploymentsMarvin
10/20/2025, 10:53 AMTushar Mahajan
10/20/2025, 10:55 AMname: common
prefect-version: 3.4.21
build:
deployments:
- name: "enrich_ims_org_info_table"
entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
pull:
- prefect.deployments.steps.git_clone:
id: clone-step
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table"
requirements_file: requirements.txt
work_pool:
name: "Local Pod Process"
schedules:
- cron: "0 11 * * *"
slug: "enrich_ims_org_info_table-daily-at-11am-ist"
timezone: "Asia/Kolkata"
active: true
- name: "get_export_job_stats"
entrypoint: "flows/ups_profile_export/get_export_job_stats/flow.py:get_export_job_stats"
pull:
- prefect.deployments.steps.git_clone:
id: clone-step
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone-step.directory }}/flows/ups_profile_export/get_export_job_stats"
requirements_file: requirements.txt
work_pool:
name: "Local Pod Process"
schedules:
- cron: "30 11 * * *"
slug: "get_export_job_stats-daily-at-1130am-ist"
timezone: "Asia/Kolkata"
active: true
parameters:
envs:
- prod
regions:
- va7
- nld2
- aus5
- can2
- gbr9
- ind2
models:
- "_xdm.context.profile"
sources:
- "scheduler"
- name: "sandbox_capacity_anomaly_detection"
entrypoint: "flows/axia/sandbox_capacity_anomaly_detection/flow.py:sandbox_capacity_anomaly_report"
pull:
- prefect.deployments.steps.git_clone:
id: clone-step
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone-step.directory }}/flows/axia/sandbox_capacity_anomaly_detection"
requirements_file: requirements.txt
work_pool:
name: "Local Pod Process"
schedules:
- cron: "00 12 * * *"
slug: "sandbox_capacity_anomaly_detection-daily-at-1200pm-ist"
timezone: "Asia/Kolkata"
active: true
parameters:
envs:
- prod
regions:
- va7
- nld2
- aus5
- can2
- gbr9
- ind2
capacity_names:
- STREAMING_THROUGHPUT
slack_channel: C09LZBQ1K1V
Marvin
10/20/2025, 10:55 AMMarvin
10/20/2025, 10:57 AMpip_install_requirements
, the daily failure with ModuleNotFoundError("No module named 'splunklib'") is almost certainly an environment mismatch during runtime installs.
What’s happening
- Your pull step installs packages right before the flow runs. From your logs, pip is installing to the user site (/home/prefect/.local/...
) and warnings show scripts are not on PATH.
- In a Process pool, the flow runs with the Python interpreter/environment that the worker is using. If pip installs to a different location (user site vs the interpreter’s site‑packages) or upgrades/downgrades packages (including Prefect) at runtime, the flow subprocess may not see the newly installed module on the first attempt.
- Retrying a task 10 seconds later may succeed because the interpreter state/path resolution ends up seeing the package on the next import attempt, but this is brittle and symptomatic of installing into the wrong place.
Why this is fragile
- Installing Prefect and other deps at runtime into the same environment the worker is using can cause path and version drift. The .local
user install is a strong hint that pip isn’t writing to the interpreter’s site‑packages the flow uses.
- Process pools do not isolate environments per run; they inherit the worker’s environment. There’s no container or venv boundary unless you create one.
How to fix it (pick one)
Option A — Recommended for Process pools: pre-provision the worker’s venv and stop installing at runtime
- Create/activate a venv on the worker host.
- Install all runtime deps there (including splunk-sdk), and DO NOT install Prefect during pull steps.
- Start the Prefect worker from that venv.
- Remove the pip_install_requirements
step from your deployments. Keep git_clone
if you need code.
Option B — If you must install at runtime, ensure you install into the exact interpreter the flow will use
- Replace pip_install_requirements
with a run_shell_script
that uses `python -m pip`:
pull:
- prefect.deployments.steps.git_clone:
id: clone-step
repository: <https://github.com/OneAdobe/minerva-prefect.git>
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
- prefect.deployments.steps.run_shell_script:
script: |
cd "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table"
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
- Remove prefect
from your requirements.txt used at runtime. Pin Prefect in the worker/image instead to avoid upgrading the SDK mid-run.
- Prefer a venv you control rather than relying on user-site installs.
Quick checks to confirm the diagnosis
- Make sure flows/common/enrich_ims_org_info_table/requirements.txt
actually includes splunk-sdk
(the distribution that provides the splunklib
import).
- Add a small debug task to print the interpreter and whether splunk-sdk
is installed at runtime:
from prefect import task, flow
import sys, subprocess
@task
def debug_env():
print("sys.executable:", sys.executable)
subprocess.run([sys.executable, "-m", "pip", "show", "splunk-sdk"])
@flow
def debug_flow():
debug_env()
If you share:
- The contents of that requirements.txt
- How you start the worker (command and whether it’s in a venv)
- Output of sys.executable from a run
…I can give you an exact, minimal change to make this rock solid.
Docs for reference
- Deployments and pull steps: Deployments
- prefect.yaml and steps (git clone, pip install, shell): prefect.yaml
- Process work pools: Run flows in local processes
- Docker/Kubernetes pattern (if you decide to containerize): DockerTushar Mahajan
10/20/2025, 6:00 PMMarvin
10/20/2025, 6:00 PMMarvin
10/20/2025, 6:15 PMFROM prefecthq/prefect:3.4.23-python3.11
# Optional: git for cloning during pull steps; add any system packages you need
RUN apt-get update && apt-get install -y --no-install-recommends git && \
rm -rf /var/lib/apt/lists/*
WORKDIR /opt/prefect
2) prefect.yaml that builds/pushes the image and configures K8s runs with a per-run venv
Adjust the registry, work pool name, repo URL, and requirements paths for each flow.
```
name: minerva-k8s
prefect-version: 3.4.23
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build-image
image_name: your-registry.example.com/your-team/prefect-flows
tag: "latest"
dockerfile: "Dockerfile"
push:
- prefect_docker.deployments.steps.push_docker_image:
image_name: your-registry.example.com/your-team/prefect-flows
tag: "latest"
deployments:
- name: "enrich_ims_org_info_table"
entrypoint: "flows/common/enrich_ims_org_info_table/flow.py:enrich_ims_org_info_table"
pull:
- prefect.deployments.steps.git_clone:
id: clone-step
repository: https://github.com/OneAdobe/minerva-prefect.git
branch: main
access_token: "{{ prefect.blocks.secret.dxminerva-git-pat }}"
# This will run after we switch the container into the per-run venv (see command below)
- prefect.deployments.steps.pip_install_requirements:
directory: "{{ clone-step.directory }}/flows/common/enrich_ims_org_info_table"
requirements_file: requirements.txt
work_pool:
name: "Your K8s Work Pool"
job_variables:
# Ensure the pod pulls the image built above
image: "your-registry.example.com/your-team/prefect-flows:latest"
image_pull_policy: "Always"
# Run each flow in its own venv by creating it before Prefect executes the run
# NOTE: This requires your work pool's base job template to allow overriding 'command' and 'args'
command: ["/bin/bash", "-lc"]
args:
- |
set -euo pipefail
VENV_DIR=/opt/prefect/venvs/${PREFECT_FLOW_RUN_ID}
python -m venv "$VENV_DIR"
source "$VENV_DIR/bin/activate"
python -m pip install --upgrade pip
# Execute the Prefect flow-run inside this venv; pull steps will run using this interpreter
exec prefect flow-run execute
# Optional: add env vars (examples)
env:
- name: PIP_NO_CACHE_DIR
value: "1"
# If your registry requires auth, add imagePullSecrets in the work pool base job template or here
schedules:
- cron: "0 11 * * *"
slug: "enrich_ims_org_info_table-daily-at-11am-ist"
timezone: "Asia/Kolkata"
active: true
# Repeat for other deployments; change the requirements path per flow
- name: "get_export_job_stats"
entrypoint: "flows/ups_profile_export/get_export_job_stats/flow.py:get_export_job_stats"
pull:Marvin
10/20/2025, 6:15 PMprefect[docker]
.
References
- Prefect deployment YAML and steps: prefect.yaml
- Kubernetes worker setup and job variables: Kubernetes
- Docker build/push steps: Docker
If you share:
- Your Kubernetes work pool’s base job template
- The name of your registry and whether it needs imagePullSecrets
- One flow’s exact requirements.txt path
…I can tailor the job_variables (and base job template snippet) to your cluster and finalize the YAML.