<@ULVA73B9P> I have prefect v1 deployment via helm...
# ask-marvin
m
@Marvin I have prefect v1 deployment via helm chart and I need to add nodeselector to a default job template for running the flows. Do I need to do custom job template or is there and option how to add it directly via manifest?
m
thought for 131.2 seconds
Short answer: in Prefect 1, there isn’t a Helm “toggle” to inject a nodeSelector into the agent’s default Job template. You’ll need to provide a custom Job template, either agent-wide (via the Helm release) or per-flow (via KubernetesRun). Pick whichever suits your setup. Two workable approaches: 1) Agent-wide via Helm (recommended if you want it for all flows) - Create a Job template that adds the nodeSelector under pod spec:
Copy code
apiVersion: batch/v1
kind: Job
spec:
  template:
    spec:
      nodeSelector:
        nodepool: high-mem
      # optionally also:
      # tolerations:
      # affinity:
- Mount it into the agent pod and point the agent to it. The exact values keys depend on your chart, but the pattern is: - Mount a ConfigMap with the template file into the agent container - Pass the agent the flag to use it (e.g.
--job-template /path/to/job_template.yaml
) Example values.yaml sketch (adapt to your chart’s schema):
Copy code
agent:
  extraVolumes:
    - name: job-template
      configMap:
        name: prefect-job-template
  extraVolumeMounts:
    - name: job-template
      mountPath: /opt/prefect/job-template
      readOnly: true
  extraArgs:
    - --job-template
    - /opt/prefect/job-template/job_template.yaml
If your chart doesn’t expose extraArgs/volume mounts, you can still do this by editing the agent Deployment manifest in your Helm values. 2) Per-flow via KubernetesRun (good if you only want it on some flows) - Set
run_config
on your flow and provide a partial Job template to merge:
Copy code
from prefect import Flow, task
from prefect.run_configs import KubernetesRun

job_template = {
  "apiVersion": "batch/v1",
  "kind": "Job",
  "spec": {
    "template": {
      "spec": {
        "nodeSelector": {"nodepool": "high-mem"}
      }
    }
  }
}

@task
def do_work():
    pass

with Flow("my-flow", run_config=KubernetesRun(job_template=job_template)) as flow:
    do_work()
You can also use
KubernetesRun(job_template_path="job_template.yaml")
. Docs for reference: - Kubernetes Agent (v1) and custom Job templates: Prefect v1: Kubernetes Agent - Custom Job templates - KubernetesRun (v1): Prefect v1 API: KubernetesRun If you share which Helm chart and values you’re using, I can sketch the exact values.yaml you’ll need. I know, thrilling.
m
@Marvin I am getting an error that the template is not recognized. Run config looks like this: { "env": null, "type": "KubernetesRun", "image": "392865356492.dkr.ecr.us-east-1.amazonaws.com/eashared-prefectv1-dev:pp-9775c87", "labels": ["ea-data-manufacturing"], "cpu_limit": null, "__version__": "1.4.1", "cpu_request": "250m", "job_template": { "kind": "Job", "spec": { "template": { "spec": { "nodeSelector": { "kubernetes.io/arch": "amd64" } } } }, "apiVersion": "batch/v1" }, "memory_limit": "1Gi", "memory_request": "512Mi", "image_pull_policy": null, "job_template_path": null, "image_pull_secrets": null, "service_account_name": null } What is missing?
m
thought for 122.4 seconds
A few likely culprits here: 1) Slack auto-linking mangled your keys - In your paste I see: - `image`:
<http://3928...|3928...>
-
nodeSelector
key:
"<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64"
- Kubernetes expects plain strings and a valid label key:
<http://kubernetes.io/arch|kubernetes.io/arch>
- If that linked text actually made it into your run config or job template file, the API will reject it with confusing “not recognized” errors. - Make sure the actual values are exactly: - `image`:
<http://392865356492.dkr.ecr.us-east-1.amazonaws.com/eashared-prefectv1-dev:pp-9775c87|392865356492.dkr.ecr.us-east-1.amazonaws.com/eashared-prefectv1-dev:pp-9775c87>
- `nodeSelector`:
{ "<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64" }
2) Provide a minimal, known-good job template shape - Your structure is close, but some clusters are picky if key shells are missing. This template is safe and merges cleanly with Prefect’s default:
Copy code
{
  "apiVersion": "batch/v1",
  "kind": "Job",
  "metadata": {},
  "spec": {
    "template": {
      "metadata": {},
      "spec": {
        "restartPolicy": "Never",
        "nodeSelector": {
          "<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64"
        }
      }
    }
  }
}
3) Make sure the agent is actually using Kubernetes jobs - This only works with the Prefect v1 Kubernetes Agent. If you’re using a different agent/executor by accident, the template won’t apply. 4) If you’re setting this via a file, prefer job_template_path - Sometimes serializing large dicts through the UI is brittle. Mount a YAML and point to it:
Copy code
run_config=KubernetesRun(job_template_path="/opt/prefect/job-template/job_template.yaml")
- YAML content:
Copy code
apiVersion: batch/v1
kind: Job
spec:
  template:
    spec:
      restartPolicy: Never
      nodeSelector:
        <http://kubernetes.io/arch|kubernetes.io/arch>: amd64
If this still errors, please paste the exact error text and source (Kubernetes event, agent logs, or Prefect UI) so we can target it precisely. The exact phrase matters; “template not recognized” usually means the API didn’t like the object shape or a field name. My money is still on the Slack-mangled label key.
m
@Marvin I am getting this error after fixing the mangled yaml: (422) Reason: Unprocessable Entity HTTP response headers: HTTPHeaderDict({'Audit-Id': '4533ab6d-acf3-4560-85ad-566c39ea60ca', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1a77bbfb-323f-4d6d-95ee-08fdd09c6d64', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'b22ea373-c26e-44c2-b475-ed4bdd306a09', 'Date': 'Wed, 13 Aug 2025 112905 GMT', 'Content-Length': '401'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job.batch \"prefect-job-62e1da94\" is invalid: spec.template.spec.containers[0].name: Required value","reason":"Invalid","details":{"name":"prefect-job-62e1da94","group":"batch","kind":"Job","causes":[{"reason":"FieldValueRequired","message":"Required value","field":"spec.template.spec.containers[0].name"}]},"code":422} How do I fix that?
m
thought for 45.2 seconds
Kubernetes is telling you the final Job manifest it received has a container without a name. That happens when your job_template introduces a containers list that overwrites Prefect’s default container spec. What to fix - EITHER: Don’t define containers in your job_template at all if you only need nodeSelector/tolerations/affinity. Let Prefect supply the container. - OR: If you must touch container-level fields (resources, env, volumeMounts), include a complete container entry with a name. Prefect uses the first container to run your flow; give it a name (e.g., flow). Lists are replaced, not merged. Known-good examples Simplest (no containers; just nodeSelector):
Copy code
apiVersion: batch/v1
kind: Job
spec:
  template:
    spec:
      restartPolicy: Never
      nodeSelector:
        <http://kubernetes.io/arch|kubernetes.io/arch>: amd64
If you need to set container-specific fields, include a named container:
Copy code
apiVersion: batch/v1
kind: Job
spec:
  template:
    spec:
      restartPolicy: Never
      nodeSelector:
        <http://kubernetes.io/arch|kubernetes.io/arch>: amd64
      containers:
        - name: flow
          resources:
            requests:
              cpu: "250m"
              memory: "512Mi"
            limits:
              memory: "1Gi"
          # add env, volumeMounts, etc. here if needed
Why this happens - Prefect v1 merges your job_template into its base Job. Dicts are merged, but lists (like spec.template.spec.containers) are replaced positionally. If your template adds containers without a name, it replaces Prefect’s container (which has a name) and Kubernetes rejects it with “containers[0].name: Required value.” If you still hit 422, post the exact job_template you’re using now (the part under spec.template.spec) and I’ll point at the offending bit. Docs for reference: Prefect v1: Kubernetes Agent - Custom Job templates.
m
@Marvin I am using it like this: job_template = { "apiVersion": "batch/v1", "kind": "Job", "spec": { "template": { "spec": { "nodeSelector": {"kubernetes.io/arch": "amd64"} } } } } # Configure Kubernetes run config with the EA agent image run_config = KubernetesRun( image=image, labels=list(labels) if labels else [], memory_request="512Mi", memory_limit="1Gi", cpu_request="250m", job_template=job_template ) Is that correct?
m
thought for 34.7 seconds
Close, but not quite. Two fixes: 1) Use the real label key (Slack keeps mangling it) - Your pasted dict shows
"<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64"
. Kubernetes needs the plain key
<http://kubernetes.io/arch|kubernetes.io/arch>
. - Corrected:
Copy code
job_template = {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "spec": {
        "template": {
            "spec": {
                "nodeSelector": {
                    "<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64"
                },
                "restartPolicy": "Never"
            }
        }
    }
}
2) Don’t introduce a containers list unless you fully define it - Your 422 “containers[0].name: Required value” happens when a job_template adds
spec.template.spec.containers
and overwrites Prefect’s default container (which has a name). The snippet you showed doesn’t add containers, so it’s fine. Just make sure nowhere else in your template or code you add an empty
containers: []
or a partial container. If you really need to touch container-level fields, include a name:
Copy code
spec:
  template:
    spec:
      nodeSelector:
        <http://kubernetes.io/arch|kubernetes.io/arch>: amd64
      containers:
        - name: flow
          resources:
            requests:
              cpu: "250m"
              memory: "512Mi"
            limits:
              memory: "1Gi"
Sanity checks - Ensure the actual Python string key is exactly "kubernetes.io/arch" (Slack links are just a display artifact, but if that leaked into your source it will fail). - Prefer YAML via job_template_path to avoid JSON serialization quirks: - YAML:
Copy code
apiVersion: batch/v1
    kind: Job
    spec:
      template:
        spec:
          restartPolicy: Never
          nodeSelector:
            <http://kubernetes.io/arch|kubernetes.io/arch>: amd64
- Python:
Copy code
run_config = KubernetesRun(
        image=image,
        labels=list(labels) if labels else [],
        memory_request="512Mi",
        memory_limit="1Gi",
        cpu_request="250m",
        job_template_path="/opt/prefect/job-template/job_template.yaml",
    )
If it still 422s, crank agent logging to DEBUG (env var
PREFECT__LOGGING__LEVEL=DEBUG
) and share the rendered Job spec the agent is submitting. That will show if any
containers
list slipped in and replaced Prefect’s default container. Docs for context: Prefect v1: Kubernetes Agent - Custom Job templates.
m
@Marvin I've done that. The render config is now this: { "env": { "PREFECT__LOGGING__LEVEL": "DEBUG" }, "type": "KubernetesRun", "image": "392865356492.dkr.ecr.us-east-1.amazonaws.com/eashared-prefectv1-dev:pp-9775c87", "labels": ["ea-data-manufacturing"], "cpu_limit": null, "__version__": "1.4.1", "cpu_request": "250m", "job_template": { "kind": "Job", "spec": { "template": { "spec": { "nodeSelector": { "kubernetes.io/arch": "amd64" }, "restartPolicy": "Never" } } }, "apiVersion": "batch/v1" }, "memory_limit": "1Gi", "memory_request": "512Mi", "image_pull_policy": null, "job_template_path": null, "image_pull_secrets": null, "service_account_name": null } and the error remains: (422) Reason: Unprocessable Entity HTTP response headers: HTTPHeaderDict({'Audit-Id': '79363676-5f4c-47a1-813c-fb06d55c864f', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1a77bbfb-323f-4d6d-95ee-08fdd09c6d64', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'b22ea373-c26e-44c2-b475-ed4bdd306a09', 'Date': 'Wed, 13 Aug 2025 115521 GMT', 'Content-Length': '401'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job.batch \"prefect-job-9ded4120\" is invalid: spec.template.spec.containers[0].name: Required value","reason":"Invalid","details":{"name":"prefect-job-9ded4120","group":"batch","kind":"Job","causes":[{"reason":"FieldValueRequired","message":"Required value","field":"spec.template.spec.containers[0].name"}]},"code":422} how to fix that?
m
thought for 76.6 seconds
The 422 is coming from Kubernetes because the final Job the agent submitted has a containers[0] without a name. That only happens if a containers list is being introduced somewhere and overwriting Prefect’s default container (which does have a name). Your per-flow job_template (with only nodeSelector + restartPolicy) is fine. So the most likely culprit is an agent-level Job template being used by your Helm release. What to check and fix - Check if your agent is started with a custom job template - Look for a flag like --job-template /path/to/job_template.yaml on the agent container args, or an env like PREFECT__CLOUD__AGENT__KUBERNETES__JOB_TEMPLATE_PATH (or similar) in your Helm values. - If present, open that file/ConfigMap. If it defines spec.template.spec.containers, make sure the first entry has a name and the rest of the required fields. Better: remove the containers block entirely unless you really need to change it. - Use a minimal agent job template (or none at all) - A safe agent-level template that only adds nodeSelector:
Copy code
apiVersion: batch/v1
    kind: Job
    spec:
      template:
        spec:
          restartPolicy: Never
          nodeSelector:
            <http://kubernetes.io/arch|kubernetes.io/arch>: amd64
- Do not include containers: in the agent template. Prefect will provide the correct container (with name, image, command, args). - Keep your per-flow job_template minimal - The snippet you’re using is correct once you remove Slack’s link mangling:
Copy code
job_template = {
      "apiVersion": "batch/v1",
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "restartPolicy": "Never",
            "nodeSelector": {"<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64"}
          }
        }
      }
    }
- Do not add containers here either unless you fully define it (with name, image/command/args). Lists replace, they don’t merge. How to prove it quickly - Turn on DEBUG logging on the agent pod (not just the flow). Then watch the agent logs; Prefect v1 logs the rendered Job it’s submitting. You’ll see a containers block without a name in there if my suspicion is right. - If you paste that rendered Job (spec.template.spec portion) here, I’ll point to the offending field. References - Prefect v1: Kubernetes Agent custom job templates and merge behavior: https://docs-v1.prefect.io/orchestration/agents/kubernetes.html#custom-job-templates Side note: your Slack paste still shows linked strings like <…|…>. Make sure your actual source has plain strings: - image: 392865356492.dkr.ecr.us-east-1.amazonaws.com/eashared-prefectv1-dev:pp-9775c87 - nodeSelector key: kubernetes.io/arch If you do find an agent-level template is in use and you actually need to customize containers, you’ll need a complete container spec (including name) that also preserves Prefect’s command/args. Easiest path is to copy Prefect’s default template and only add what you need. But if you don’t need container-level changes, removing containers from the agent template is the clean fix.
m
@Marvin where can I see the agent logs?
m
thought for 34.4 seconds
In your Helm/Kubernetes setup, the Prefect v1 Kubernetes Agent is just a Deployment/Pod. Its logs go to stdout/stderr, so you read them with kubectl. Quick path: - Find the namespace and pod - If you know the namespace:
Copy code
kubectl -n <ns> get pods | grep agent
- If you don’t:
Copy code
kubectl get pods -A | grep agent
- Tail the logs - From the Deployment (handles restarts nicely):
Copy code
kubectl -n <ns> logs -f deployment/<agent-deployment-name>
- Or directly from the pod:
Copy code
kubectl -n <ns> logs -f <agent-pod-name>
Useful tricks: - If you installed via Helm and know the release name:
Copy code
kubectl -n <ns> get deploy -l <http://app.kubernetes.io/instance=<release-name|app.kubernetes.io/instance=<release-name>>
Look for something with “agent” in the name. - To inspect the agent’s args/env (to see if a job template is set):
Copy code
kubectl -n <ns> get deploy <agent-deployment-name> -o yaml
Check: - args for something like
--job-template /path/to/job_template.yaml
- env for
PREFECT__LOGGING__LEVEL
,
PREFECT__CLOUD__AGENT__KUBERNETES__JOB_TEMPLATE_PATH
, etc. Enable DEBUG on the agent (not just the flow): - Set it via your Helm values and redeploy. The exact keys vary by chart, but common patterns are:
Copy code
agent:
    extraEnv:
      - name: PREFECT__LOGGING__LEVEL
        value: DEBUG
    extraArgs:
      - --log-level
      - DEBUG
If your chart doesn’t have extraEnv/extraArgs, add the env/args directly under the agent container spec in values. Once DEBUG is on, the agent will log the rendered Job it submits. That’s where you’ll see the offending containers list if one is being injected somewhere. Docs (v1): Kubernetes Agent and custom Job templates
m
@Marvin These are full agent logs, how do I fix the error?
raceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/agent/agent.py", line 388, in _deploy_flow_run
deployment_info = self.deploy_flow(flow_run)
File "/usr/local/lib/python3.7/site-packages/prefect/agent/kubernetes/agent.py", line 438, in deploy_flow
namespace=self.namespace, body=job_spec
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
return self.create_namespaced_job_with_http_info(namespace, body, **kwargs)  # noqa: E501
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 323, in create_namespaced_job_with_http_info
collection_formats=collection_formats)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
_preload_content, _request_timeout, _host)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 397, in request
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 282, in POST
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 235, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (422)
Reason: Unprocessable Entity
HTTP response headers: HTTPHeaderDict({'Audit-Id': '91c3e8b8-49db-4f73-8de1-6f723adad018', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1a77bbfb-323f-4d6d-95ee-08fdd09c6d64', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'b22ea373-c26e-44c2-b475-ed4bdd306a09', 'Date': 'Wed, 13 Aug 2025 12:37:54 GMT', 'Content-Length': '401'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job.batch \"prefect-job-6c3c740c\" is invalid: spec.template.spec.containers[0].name: Required value","reason":"Invalid","details":{"name":"prefect-job-6c3c740c","group":"batch","kind":"Job","causes":[{"reason":"FieldValueRequired","message":"Required value","field":"spec.template.spec.containers[0].name"}]},"code":422}
m
thought for 67.0 seconds
Your agent logs confirm it: the Job the agent submits has a container without a name. Kubernetes rejects it with containers[0].name: Required value. That only happens if a containers list is being injected somewhere and replacing Prefect’s default container (which does have a name). Your per-flow job_template that only sets nodeSelector/restartPolicy is fine, so the most likely culprit is an agent-level Job template from your Helm release. Two ways to fix it: Option A — Fix/remove the agent-level job template (recommended) - Inspect the agent Deployment to see if a job template is set: - Check args/env:
Copy code
kubectl -n <ns> get deploy <agent-deployment-name> -o yaml
Look for: - args containing
--job-template /path/to/job_template.yaml
- env like
PREFECT__CLOUD__AGENT__KUBERNETES__JOB_TEMPLATE_PATH
(or
PREFECT__AGENT__KUBERNETES__JOB_TEMPLATE_PATH
for Server) - If a template is configured, open that file/ConfigMap. If it defines spec.template.spec.containers, either: - Remove the containers block entirely, or - Ensure the first container has a name (and any other required fields you’ve overridden). - A safe agent template that won’t clobber Prefect’s container:
Copy code
apiVersion: batch/v1
  kind: Job
  spec:
    template:
      spec:
        restartPolicy: Never
        nodeSelector:
          <http://kubernetes.io/arch|kubernetes.io/arch>: amd64
- Redeploy the Helm release so the agent picks up the change. Option B — Quick per-flow patch to overpower the bad agent template - Add a minimal containers list with a name in your flow’s job_template. This forces the merged manifest to have a valid name, while Prefect fills in image/command/args.
Copy code
job_template = {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "spec": {
      "template": {
        "spec": {
          "restartPolicy": "Never",
          "nodeSelector": {
            "<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64"
          },
          "containers": [
            {"name": "flow"}
          ]
        }
      }
    }
  }
- Keep everything else out of containers unless you fully define it. Lists replace, they don’t merge. Sanity checks - Make sure your actual source uses plain strings (Slack links are cosmetic in chat but fatal in code): - image: 392865356492.dkr.ecr.us-east-1.amazonaws.com/eashared-prefectv1-dev:pp-9775c87 - nodeSelector key: kubernetes.io/arch - Turn on DEBUG on the agent (not just the flow) to see the rendered Job the agent submits: - Add to the agent container env/args via Helm: - env: PREFECT__LOGGING__LEVEL=DEBUG - or args: --log-level DEBUG - Then tail logs:
Copy code
kubectl -n <ns> logs -f deployment/<agent-deployment-name>
You should see the final spec.template.spec, including containers. If a nameless container is still present, we’ll know exactly where it’s coming from. Docs for context: Prefect v1 Kubernetes Agent custom job templates - https://docs-v1.prefect.io/orchestration/agents/kubernetes.html#custom-job-templates If you paste the agent Deployment args/env and any agent job template you find, I’ll point out the exact line to change. Yes, excavation through YAML—my favorite pastime.
m
@Marvin now I am getting this in agent logs:
[2025-08-13 12:37:54,741] ERROR - agent | Updating flow run 20827321-5301-4f23-9c51-7532ed1828d2 state to Failed...
[2025-08-13 12:37:54,782] ERROR - agent | Deployment of 20827321-5301-4f23-9c51-7532ed1828d2 aborted!
[2025-08-13 12:49:42,629] ERROR - agent | Failed to query for ready flow runs
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/agent/agent.py", line 320, in _submit_deploy_flow_run_jobs
flow_run_ids = self._get_ready_flow_runs()
File "/usr/local/lib/python3.7/site-packages/prefect/agent/agent.py", line 577, in _get_ready_flow_runs
"tenant_id": self.client.tenant_id,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 465, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'message': 'request to <http://prefectdev-graphql.ea-prefect-dev:4201/graphql/> failed, reason: connect ECONNREFUSED 172.20.218.241:4201', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_runs_in_queue'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://prefectdev-graphql.ea-prefect-dev:4201/graphql/> failed, reason: connect ECONNREFUSED 172.20.218.241:4201', 'type': 'system', 'errno': 'ECONNREFUSED', 'code': 'ECONNREFUSED'}}}]
[2025-08-13 12:49:52,638] ERROR - agent | Failed to query for ready flow runs
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/agent/agent.py", line 320, in _submit_deploy_flow_run_jobs
flow_run_ids = self._get_ready_flow_runs()
File "/usr/local/lib/python3.7/site-packages/prefect/agent/agent.py", line 577, in _get_ready_flow_runs
"tenant_id": self.client.tenant_id,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 465, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'message': 'request to <http://prefectdev-graphql.ea-prefect-dev:4201/graphql/> failed, reason: connect ECONNREFUSED 172.20.218.241:4201', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_runs_in_queue'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://prefectdev-graphql.ea-prefect-dev:4201/graphql/> failed, reason: connect ECONNREFUSED 172.20.218.241:4201', 'type': 'system', 'errno': 'ECONNREFUSED', 'code': 'ECONNREFUSED'}}}]
[2025-08-13 12:59:04,381] INFO - agent | Deploying flow run 62e99500-5f8b-48f4-bc73-7a41b6485fd3 to execution environment...
[2025-08-13 12:59:04,461] INFO - agent | Completed deployment of flow run 62e99500-5f8b-48f4-bc73-7a41b6485fd3
But this in prefect UI Failed to load and execute flow run: NotGitRepository()
Wha is the real error here?
m
thought for 63.9 seconds
Short version: you’ve got two separate issues. 1) The ECONNREFUSED messages are your agent briefly losing connectivity to your Prefect Server GraphQL service. That affects scheduling but not your flow code. Check the Server pods/services/endpoints and network policies, but it later recovered. 2) The flow run failed because of NotGitRepository(). That’s the “real” cause of the failed run you see in the UI. It means the flow job couldn’t load your code from storage. Why NotGitRepository happens in Prefect 1 - You’re using a storage option that expects a Git repo (Git/GitHub storage) but: - git isn’t installed in the runtime image, or - credentials/URL/branch/path are wrong, or - you’re not actually running inside a repo and some code or storage tried to treat the working dir as one. - Or you used Local storage while running on a remote Kubernetes agent — not supported. What to do Pick one of these patterns and stick to it: A) Bake the flow into the image (Docker storage) — simplest for k8s - Build an image that contains your flow code. - Set Docker storage on the flow at registration time and run with the same image. Example:
Copy code
from prefect import Flow
from prefect.storage import Docker

flow = Flow("my-flow")
flow.storage = Docker(
  registry_url="<http://392865356492.dkr.ecr.us-east-1.amazonaws.com|392865356492.dkr.ecr.us-east-1.amazonaws.com>",
  image_name="eashared-prefectv1-dev",
  image_tag="pp-9775c87",
  python_dependencies=["prefect==1.4.1"]  # and your deps
)
# flow.register(project_name="...")  # when you register
Then your KubernetesRun can reference that image. No Git needed; NotGitRepository goes away. B) Use Git/GitHub storage correctly - Ensure your runtime image has git (and often openssh) installed. - Debian/Ubuntu base:
Copy code
apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*
- Alpine:
Copy code
apk add --no-cache git openssh
- Configure storage correctly:
Copy code
from prefect.storage import Git  # or GitHub
flow.storage = Git(
  repo="<https://github.com/owner/repo.git>",
  flow_path="flows/my_flow.py",
  branch="main"  # or tag/commit
  # If private, put the PAT in the URL or use env/SSH key
)
- If private repos, provide credentials: - HTTPS with PAT: https://<token>@github.com/owner/repo.git - Or mount SSH keys and set
GIT_SSH_COMMAND=ssh -i /path/key -o StrictHostKeyChecking=no
C) Avoid Local storage on remote agents - If you didn’t set storage at all, Prefect defaults to Local storage. That won’t work on a Kubernetes agent unless the exact code path is mounted into the job pod. Get the exact failure details Pull the flow pod logs. That shows the full traceback causing NotGitRepository:
Copy code
# Find the pod for this run
kubectl -n <ns> get pods -l "<http://prefect.io/flow-run-id=62e99500-5f8b-48f4-bc73-7a41b6485fd3|prefect.io/flow-run-id=62e99500-5f8b-48f4-bc73-7a41b6485fd3>" -o name
# Then:
kubectl -n <ns> logs <pod-name>
If the logs say “git: not found”, install git in the image. If they show permission/auth errors, fix the repo URL/credentials. If the traceback points to your own code calling GitPython (e.g., git.Repo(".")), either install git and ensure a real repo is present or remove that dependency. Meanwhile, about the agent’s ECONNREFUSED - Verify the Server services are reachable from the agent: - Ensure the service name/namespace/port are correct: prefectdev-graphql.ea-prefect-dev:4201 - Check service endpoints:
Copy code
kubectl -n ea-prefect-dev get svc,pods,endpoints | grep -E 'prefectdev-graphql|4201'
- Confirm the agent env points at the right URL (PREFECT__SERVER__ENDPOINT or PREFECT__CLOUD__API) and there aren’t network policies blocking it. If you share: - Your flow’s storage configuration, - The Dockerfile for your runtime image, and - The flow pod logs for the failed run, I’ll point out the precise fix. I know, thrilling detective work.
m
@Marvin I am getting this error - Repo 'https://github.com/enverus-ea/ea.data.mfg.prefectV1.git' not found. Check that it exists (and is spelled correctly), and that you have configured the proper credentials for accessing it. But I have this setup if storage_type == 'git': storage = GitHub( repo=git_repo, path=flow_file, ref=git_ref, access_token_secret=os.getenv("GITHUB_ACCESS_TOKEN") with repo, path and ref being definitelly correct and GITHUB_ACCESS_TOKEN setup as environmental variable under agent and agent.job. why is github not accessible?
m
thought for 120.5 seconds
You’re hitting two classic Prefect 1 “gotchas” with GitHub storage: 1) repo value is wrong for GitHub storage - For prefect.storage.GitHub, repo must be owner/repo, not a full HTTPS URL. - Use: repo="enverus-ea/ea.data.mfg.prefectV1" - Don’t use: repo="https://github.com/enverus-ea/ea.data.mfg.prefectV1.git" 2) access_token_secret expects a Prefect Secret name, not the token itself - In Prefect 1, GitHub(access_token_secret="...") will look up a Prefect Secret with that name. - If you pass os.getenv("GITHUB_ACCESS_TOKEN"), Prefect will try to look up a Secret literally named your token string, fail, and fall back to anonymous access → “Repo not found” for private repos. Fix it like this: - Storage config
Copy code
from prefect.storage import GitHub

storage = GitHub(
    repo="enverus-ea/ea.data.mfg.prefectV1",  # owner/repo, no .git, no https
    path=flow_file,                           # e.g., "flows/my_flow.py"
    ref=git_ref,                              # branch/tag/commit
    access_token_secret="GITHUB_ACCESS_TOKEN" # name of the secret, not the value
)
flow.storage = storage
- Provide the token to the flow run pod (not just the agent) Option A: Prefect Secret (recommended) - Add a Secret named GITHUB_ACCESS_TOKEN in your Prefect Server/Cloud UI with the PAT value. Nothing else to do. Option B: Environment variable in the flow pod - Prefect 1’s Secret("X").get() reads env var PREFECT__CONTEXT__SECRETS__X. - Set this in the flow run container: - Quick test via KubernetesRun:
Copy code
run_config = KubernetesRun(
      image=image,
      env={"PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN": os.getenv("GITHUB_ACCESS_TOKEN")}
    )
- Production: mount a K8s Secret and reference it in the env of the flow container. If you edit the job_template for this, be sure to include a named container to avoid list-merge clobbering:
Copy code
spec:
      template:
        spec:
          containers:
            - name: flow
              env:
                - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
                  valueFrom:
                    secretKeyRef:
                      name: github-token
                      key: token
Checklist - Token scope: for private repos, PAT must have repo scope (classic) or be a fine‑grained token with read access to that repo. - Verify in the flow pod:
Copy code
kubectl -n <ns> logs <flow-pod>
  kubectl -n <ns> exec -it <flow-pod> -- env | grep GITHUB
You should see PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN set. - Ensure no Slack link-mangling leaked into your code. Use plain strings. Docs - Prefect v1 GitHub storage: https://docs-v1.prefect.io/api/latest/storage.html#github - Prefect v1 Secrets and env var naming: https://docs-v1.prefect.io/core/concepts/secrets.html Once you switch repo to owner/repo and provide the token via a Prefect Secret (or the PREFECT__CONTEXT__SECRETS__… env var) inside the flow pod, the “Repo not found” goes away. I’d love to say GitHub is being capricious. It’s just picky.
m
@Marvin I setup GITHUB_ACCESS_TOKEN as an environmental variable under agent and agent.job and I am getting "Failed to load and execute flow run: ValueError('Local Secret "GITHUB_ACCESS_TOKEN" was not found.')" how should I set the secret in values yaml so it is picked up?
m
thought for 47.6 seconds
That error message is from Prefect 1.x. In Prefect 1,
Secret("GITHUB_ACCESS_TOKEN")
does not read a plain
GITHUB_ACCESS_TOKEN
env var — it looks for
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
(or a Cloud/Server secret with that name). Set the namespaced env var for both the agent and the jobs in your Helm values:
Copy code
agent:
  env:
    - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
      valueFrom:
        secretKeyRef:
          name: github-token
          key: token
  job:
    env:
      - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
        valueFrom:
          secretKeyRef:
            name: github-token
            key: token
Notes: - You can use
value: "..."
instead of
valueFrom
if you really enjoy hardcoding secrets into Helm values. I wouldn’t. - Alternatively, create a Secret named exactly
GITHUB_ACCESS_TOKEN
in Prefect Cloud/Server and drop the local env var entirely. Refs: - Prefect 1.x Secrets docs - Kubernetes Agent docs If you’re actually on Prefect 2/3, stop using
Secret(...)
— it’s gone. Use a Secret block or read
os.environ["GITHUB_ACCESS_TOKEN"]
directly.
m
@Marvin I've created PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN under both agent and agent.job but I am still getting "Failed to load and execute flow run: ValueError('Local Secret "GITHUB_ACCESS_TOKEN" was not found.')"
m
thought for 172.7 seconds
You’re on Prefect 1.x, so
Secret("GITHUB_ACCESS_TOKEN")
only reads from the env var
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
inside the flow run pod. Setting it on the agent alone won’t help. Likely issue: in the Helm chart, the job (flow-run) env is a top-level
job:
section, not
agent.job
. If you put it under
agent.job
, it’s ignored. Try this in your values.yaml:
Copy code
agent:
  env:
    - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
      valueFrom:
        secretKeyRef:
          name: github-token
          key: token

job:
  env:
    - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
      valueFrom:
        secretKeyRef:
          name: github-token
          key: token
Then upgrade your release and verify the flow-run pod actually has the env var: - Inspect:
kubectl get pod <flow-run-pod> -o jsonpath='{.spec.containers[0].env}'
- Or:
kubectl exec <flow-run-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
If that env var isn’t present in the flow pod, Prefect will throw exactly the “Local Secret … not found” error. Notes and gotchas: - Prefect Server doesn’t manage secrets; Cloud does. If you’re on Cloud, you can create a Secret named
GITHUB_ACCESS_TOKEN
in the UI and stop using local envs. - If you’re launching work on another layer (e.g., Dask workers), you must propagate this env to those pods too. - Chart keys vary by chart version; if this still doesn’t work, please share your chart name/version and your values.yaml snippet so I can spot the mismatch. Refs: - Prefect 1.x Secrets - Kubernetes Agent (Helm) docs
m
@Marvin This is values.yaml snippet: agent: # enabled determines if the Prefect Kubernetes agent is deployed enabled: true # prefectLabels defines what scheduling labels (not K8s labels) should # be associated with the agent prefectLabels: ["ea-data-manufacturing", "kubernetes"] # jobTemplateFilePath defines which template to use for the agent's jobs. Defaults # to an empty string, which will use the default template. # reference: https://docs.prefect.io/orchestration/agents/kubernetes.html#custom-job-template jobTemplateFilePath: "" # mount job template file from Kubernetes configmap. Filename in configmap must match the filename in jobTemplateFilePath. jobTemplateFileConfigMap: "" # image configures the container image for the agent deployment image: name: 392865356492.dkr.ecr.us-east-1.amazonaws.com/eashared-prefectv1-dev tag: pp-9775c87 pullPolicy: IfNotPresent pullSecrets: [] # - name: "secret_name" labels: {} annotations: {} podAnnotations: {} replicas: 1 strategy: {} podSecurityContext: {} securityContext: {} env: - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN valueFrom: secretKeyRef: name: github-credentials key: github-access-token - name: PREFECT__LOGGING__LEVEL value: "DEBUG" nodeSelector: kubernetes.io/arch: amd64 tolerations: [] affinity: {} # resources defines the computational resources for the Prefect Agent resources: limits: cpu: null memory: 512Mi requests: memory: 512Mi cpu: 200m # job configures values associated with jobs the agent launches job: # resources defines the computational resources for flow jobs resources: limits: memory: 512Mi requests: memory: 512Mi cpu: 200m # imagePullPolicy configures the image pull policy for the flow job imagePullPolicy: IfNotPresent # imagePullSecrets defines image pull secrets for the flow job # NOTE: These secrets are not merged with the global imagePullSecrets imagePullSecrets: [] # - "secret_name" # Environment variables for flow jobs env: - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN valueFrom: secretKeyRef: name: github-credentials key: github-access-token - name: PREFECT__LOGGING__LEVEL value: "DEBUG" # Add nodeSelector to satisfy Kyverno policy nodeSelector: kubernetes.io/arch: amd64 serviceAccount: # create specifies whether a service account should be created to be # associated wi And version is 1.41
m
thought for 65.4 seconds
Thanks for the snippet — that helps. Good news: for the Prefect 1 Helm chart you’re using (v1.41), your keys are in the right place.
agent.env
and
agent.job.env
are the correct spots for the agent pod and the flow-run pods respectively. So why the “Local Secret ... was not found”? Common culprits and fixes: 1) You’re on Prefect Cloud but trying to use local env secrets - In Prefect 1, Cloud defaults to fetching secrets from Cloud, not local env. - Add this to both agent and job env:
Copy code
- name: PREFECT__CLOUD__USE_LOCAL_SECRETS
  value: "true"
If you’d rather not force local secrets, create a Cloud Secret named exactly
GITHUB_ACCESS_TOKEN
in the UI and drop the env var entirely. 2) The env var never made it into the flow-run pod - Verify it’s there:
Copy code
kubectl get pod <flow-run-pod> -o jsonpath='{.spec.containers[0].env}'
# or
kubectl exec <flow-run-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
If it’s missing, double-check: - The k8s Secret
github-credentials
exists in the same namespace and key
github-access-token
is correct. - You didn’t switch to a custom job template (you have
jobTemplateFilePath: ""
, so you’re on the default — good). - Your release was upgraded after editing values. 3) Case/name mismatch or early import - The Secret name must match exactly:
Secret("GITHUB_ACCESS_TOKEN")
=>
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
. - If you import and call
Secret(...).get()
at module import time, that still runs inside the flow-run container, so it’s fine — but make sure you’re not trying to access it earlier on the agent. Minimal patch to your values.yaml (add use_local_secrets in both places):
Copy code
agent:
  env:
    - name: PREFECT__CLOUD__USE_LOCAL_SECRETS
      value: "true"
    - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
      valueFrom:
        secretKeyRef:
          name: github-credentials
          key: github-access-token
  job:
    env:
      - name: PREFECT__CLOUD__USE_LOCAL_SECRETS
        value: "true"
      - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
        valueFrom:
          secretKeyRef:
            name: github-credentials
            key: github-access-token
If this still errors after you confirm the env var is present in the flow pod, tell me: - Cloud or Server backend? - A snippet of where
Secret("GITHUB_ACCESS_TOKEN")
is used (flow code vs storage) - The flow-run pod’s env output References (exciting, I know): - Prefect 1.x Secrets - Kubernetes Agent (Helm) docs
m
I am not on prefect cloud, I am on backend. The secrets are greyed out in my UI. And I am calling it with: storage = GitHub( repo=git_repo, path=flow_file, ref=git_ref, access_token_secret="GITHUB_ACCESS_TOKEN" )
@Marvin I am not on prefect cloud, I am on backend. The secrets are greyed out in my UI. And I am calling it with: storage = GitHub( repo=git_repo, path=flow_file, ref=git_ref, access_token_secret="GITHUB_ACCESS_TOKEN" )
m
thought for 79.3 seconds
Got it — you’re on Prefect Server (UI secrets greyed out is expected), and you’re using GitHub storage with
access_token_secret="GITHUB_ACCESS_TOKEN"
. In Prefect 1, that call resolves via
Secret("GITHUB_ACCESS_TOKEN")
, which reads
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
inside the flow-run pod. Your Helm values look correct for chart v1.41 (
agent.env
and
agent.job.env
are the right places). So this boils down to: that env var is not actually present in the flow-run container at runtime. Please verify in the flow-run pod: - Check the env is there
Copy code
kubectl get pod <flow-run-pod> -o jsonpath='{.spec.containers[0].env}'
kubectl exec <flow-run-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
If it’s missing: - Confirm the K8s Secret exists in the same namespace and the key is correct:
Copy code
kubectl get secret github-credentials -o yaml
- Make sure you upgraded the Helm release after changing values. - Check agent logs (you set DEBUG — good). The Kubernetes agent logs print the assembled job env; confirm your secret is listed there. - Ensure you’re not using a custom job template that drops env (you’re on the default). Pragmatic workaround (avoids Prefect Secrets entirely): - Set a plain
GITHUB_ACCESS_TOKEN
env var on the job (in addition to or instead of the Prefect-namespaced one) - Pass it directly to GitHub storage:
Copy code
# values.yaml (flow-run pod)
agent:
  job:
    env:
      - name: GITHUB_ACCESS_TOKEN
        valueFrom:
          secretKeyRef:
            name: github-credentials
            key: github-access-token

# flow code
import os
from prefect.storage import GitHub

storage = GitHub(
    repo=git_repo,
    path=flow_file,
    ref=git_ref,
    access_token=os.environ["GITHUB_ACCESS_TOKEN"],
)
That sidesteps the
Secret(...)
lookup entirely. If the env var is present in the pod but you still see “Local Secret not found,” please share: - The output of
printenv | grep GITHUB
from the flow pod - Your flow’s run config (e.g.,
KubernetesRun
) and the image it uses References (the thrilling bits): - Prefect 1.x Secrets - GitHub Storage
m
@Marvin I set plain GITHUB_ACCESS_TOKEN but now I can even run the register flow. I get this: File "utilities/register-flow.py", line 245, in <module> cli() File "/opt/actions-runner/_work/_tool/Python/3.8.18/x64/lib/python3.8/site-packages/click/core.py", line 1161, in call return self.main(*args, **kwargs) File "/opt/actions-runner/_work/_tool/Python/3.8.18/x64/lib/python3.8/site-packages/click/core.py", line 1082, in main rv = self.invoke(ctx) File "/opt/actions-runner/_work/_tool/Python/3.8.18/x64/lib/python3.8/site-packages/click/core.py", line 1697, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/opt/actions-runner/_work/_tool/Python/3.8.18/x64/lib/python3.8/site-packages/click/core.py", line 1443, in invoke return ctx.invoke(self.callback, **ctx.params) File "/opt/actions-runner/_work/_tool/Python/3.8.18/x64/lib/python3.8/site-packages/click/core.py", line 788, in invoke return __callback(*args, **kwargs) File "utilities/register-flow.py", line 112, in register_flow access_token=os.environ["GITHUB_ACCESS_TOKEN"] File "/opt/actions-runner/_work/_tool/Python/3.8.18/x64/lib/python3.8/os.py", line 675, in getitem raise KeyError(key) from None KeyError: 'GITHUB_ACCESS_TOKEN' How to fix that?
m
thought for 49.2 seconds
That KeyError just means the process running register-flow doesn’t have GITHUB_ACCESS_TOKEN set. Your CI runner can’t read minds. Two clean fixes — pick one: 1) Set the env var where you run registration - GitHub Actions step/job:
Copy code
jobs:
  register:
    runs-on: ubuntu-latest
    env:
      GITHUB_ACCESS_TOKEN: ${{ secrets.GH_PAT }}  # or your secret name
    steps:
      - uses: actions/checkout@v4
      - run: python utilities/register-flow.py register-flow
- Local shell:
Copy code
export GITHUB_ACCESS_TOKEN=ghp_...
python utilities/register-flow.py register-flow
2) Revert to Prefect Secret resolution at runtime (avoids needing the token during registration) - In code, use the secret name again:
Copy code
from prefect.storage import GitHub

storage = GitHub(
    repo=git_repo,
    path=flow_file,
    ref=git_ref,
    access_token_secret="GITHUB_ACCESS_TOKEN",
)
- Ensure the flow-run pod has the Prefect-namespaced env set (you already added this in values.yaml):
Copy code
agent:
  job:
    env:
      - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
        valueFrom:
          secretKeyRef:
            name: github-credentials
            key: github-access-token
On Prefect Server, that’s all you need; no Cloud flags required. Quick sanity checks - After a run starts, verify the env is in the flow pod:
Copy code
kubectl exec <flow-run-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
- If you stick with option 1, verify the CI step sees it:
Copy code
python -c "import os; print('set' if 'GITHUB_ACCESS_TOKEN' in os.environ else 'missing')"
Extra note: GitHub Actions exposes a
GITHUB_TOKEN
by default; you can map it:
Copy code
env:
  GITHUB_ACCESS_TOKEN: ${{ secrets.GITHUB_TOKEN }}
…but ensure it has the scopes you need for your repo. Refs: - Prefect 1.x Secrets - GitHub Storage (Prefect 1)
m
@Marvin For the first solution - GITHUB_ACCESS_TOKEN is set in values yaml as env as you advised before. For the second solution. I already have that and the run still can't see it
m
thought for 99.8 seconds
Two separate contexts are getting mixed: A) Registration time (your CI/runner running register-flow.py) - The Helm values only affect pods in your cluster. They do nothing for your CI machine. - If your code does
access_token=os.environ["GITHUB_ACCESS_TOKEN"]
, you must set that env var in the process that runs registration (e.g., GitHub Actions job env, local shell, etc.). Otherwise you’ll get the KeyError you saw. B) Runtime (the flow-run pod in Kubernetes) - Using
access_token_secret="GITHUB_ACCESS_TOKEN"
is correct for Prefect 1. It resolves
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
inside the flow-run pod. - Your Helm values look right for chart 1.41 —
agent.job.env
is the correct place. Do this to get unblocked: 1) Fix registration - Either set the env var in the CI job that runs registration:
Copy code
# GitHub Actions example
env:
  GITHUB_ACCESS_TOKEN: ${{ secrets.GH_PAT }}
- Or don’t require it during registration; let runtime resolve the secret:
Copy code
from prefect.storage import GitHub

storage = GitHub(
    repo=git_repo,
    path=flow_file,
    ref=git_ref,
    access_token_secret="GITHUB_ACCESS_TOKEN",  # not os.environ[…]
)
2) Confirm the secret is in the flow-run pod - After a run starts:
Copy code
kubectl exec <flow-run-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
If it’s missing: - Ensure the k8s Secret exists and key matches:
Copy code
kubectl get secret github-credentials -n <ns> -o yaml
- Ensure the run is picked up by the agent you configured (labels match). - Upgrade the Helm release after editing values. 3) Belt-and-suspenders for Server - Add this to both agent and job env (harmless on Server, required on Cloud):
Copy code
- name: PREFECT__CLOUD__USE_LOCAL_SECRETS
  value: "true"
If you still see “Local Secret not found”, please share: - Output of
printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
from the flow pod - The agent log lines when it creates the job (you set DEBUG — perfect) - Your flow’s run configuration (e.g., KubernetesRun) and labels Docs for the bedtime reading you didn’t ask for: - Prefect 1.x Secrets - GitHub Storage
m
@Marvin I setup GITHUB_ACCESS_TOKEN locally in github and now when I run storage = GitHub( repo=git_repo, path=flow_file, ref=git_ref, access_token=os.environ["GITHUB_ACCESS_TOKEN"] ) as you suggested I get this error: TypeError: __init__() got an unexpected keyword argument 'access_token' How do I fix that?
m
thought for 45.2 seconds
You’re on Prefect 1.x. The
GitHub
storage there does not accept `access_token`; it only accepts
access_token_secret
(the name of a Prefect Secret). The
access_token
parameter is from Prefect 2/3, which you’re not using. Fix your code to:
Copy code
from prefect.storage import GitHub

storage = GitHub(
    repo=git_repo,
    path=flow_file,
    ref=git_ref,
    access_token_secret="GITHUB_ACCESS_TOKEN",
)
Then ensure the flow-run pod has this env var set so
Secret("GITHUB_ACCESS_TOKEN")
can resolve it: - Helm values (you already have this, just reiterating the critical bit):
Copy code
agent:
  env:
    - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
      valueFrom:
        secretKeyRef:
          name: github-credentials
          key: github-access-token
  job:
    env:
      - name: PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
        valueFrom:
          secretKeyRef:
            name: github-credentials
            key: github-access-token
Sanity checks: - After a run starts, verify it’s actually in the flow pod:
Copy code
kubectl exec <flow-run-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
- If it’s missing, check the k8s Secret name/key and that the agent creating the job is the one you updated. Docs, for the inevitable insomnia: - Prefect 1 GitHub Storage - Prefect 1 Secrets If you still get “Local Secret not found” after confirming the env is present in the flow pod, send me: - The flow-run pod’s env snippet - The agent logs around job creation (DEBUG) - Your run config and agent/flow labels Yes, it’s fussy. It’s also Prefect 1.
m
@Marvin if the PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN is not propagated properly, how do I ensure it is? Can the merge into the job template cause problems? job_template = { "apiVersion": "batch/v1", "kind": "Job", "spec": { "template": { "spec": { "nodeSelector": { "kubernetes.io/arch": "amd64" }, "restartPolicy": "Never", "containers": [ {"name": "flow"} ] } } } }
m
thought for 52.7 seconds
Short version: yes, a custom job template can clobber the env injection. The K8s agent only injects env into the container named “flow”. If your template replaces the
containers
list or its
env
, you can lose
agent.job.env
. How to ensure propagation - Easiest sanity check: remove your custom
job_template
and run a flow. If the env appears in the pod, your template was overwriting it. - Keep the container name as “flow”. The agent only injects into that container. - Don’t define an empty
env: []
in your template; list merges aren’t additive and can replace what the agent would add. - If you must keep a custom template, explicitly include the env var in the template so you’re not relying on the agent’s merge:
Copy code
job_template = {
  "apiVersion": "batch/v1",
  "kind": "Job",
  "spec": {
    "template": {
      "spec": {
        "restartPolicy": "Never",
        "nodeSelector": {"<http://kubernetes.io/arch|kubernetes.io/arch>": "amd64"},
        "containers": [{
          "name": "flow",
          "env": [
            {
              "name": "PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN",
              "valueFrom": {
                "secretKeyRef": {
                  "name": "github-credentials",
                  "key": "github-access-token",
                }
              }
            }
          ]
        }]
      }
    }
  }
}
- Alternatively, omit
env
entirely from the template and rely on Helm
agent.job.env
. But confirm it’s actually there: - After a run starts: -
kubectl get pod <flow-pod> -o jsonpath='{.spec.containers[0].env}'
- or
kubectl exec <flow-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
Other gotchas - Don’t mix a Helm
jobTemplateFilePath
and a Python
job_template
unless you need both; each merge layer is another chance to replace
containers/env
. - Make sure the agent that picks up the flow uses the Helm release you updated (labels match, same namespace, etc.). - If you still don’t see the var in the pod, please share: - The flow-run pod’s env output (jsonpath/printenv) - Whether you’re using a Helm jobTemplateFile and/or a Python
KubernetesRun(job_template=...)
- The agent logs around job creation (DEBUG often prints the assembled job spec) If you want to be belt-and-suspenders, add the env var both in Helm
agent.job.env
and directly in your
job_template
as above. Yes, duplicate, but at least it won’t mysteriously vanish in a merge.
m
@Marvin I was able to resolve it, but now when I run I get this error: Failed to load and execute flow run: ModuleNotFoundError("No module named 'tasks'") The pod sees the python file, but does not see the rest of the repo. How do I adjust my scripts to add the full repo to path so it sees other modules?
m
thought for 205.0 seconds
Right now you’re using Prefect 1’s GitHub storage. It fetches a single file via the GitHub API — not the whole repo. So
import tasks
will fail because there’s nothing else to import. Pick one of these patterns: Option A: Use Git storage (clone the whole repo at runtime) - This pulls your entire repo into the flow pod so intra-repo imports work.
Copy code
from prefect.storage import Git

storage = Git(
    repo="<https://github.com/your-org/your-repo.git>",
    flow_path=flow_file,          # path to the flow file within the repo
    ref=git_ref,                  # branch/tag/sha
    git_token_secret="GITHUB_ACCESS_TOKEN",  # same Prefect secret name
)
Ensure your Helm values still provide
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
to the flow pod. That’s what
git_token_secret
resolves. Option B: Build your code into the image (recommended for K8s) - Put your repo code in the container (pip install your package or copy the source in your Dockerfile), then use Module or Local storage. Example: - Dockerfile installs your project:
pip install .
- Storage points at your module:
Copy code
from prefect.storage import Module

storage = Module("your_package.flows.my_flow")
Option C: Clone via initContainer and add to PYTHONPATH - Keep your current GitHub storage if you must, and add an initContainer that clones the repo into a shared volume, then mount it and set PYTHONPATH for the main container. Sketch of a job_template addition:
Copy code
job_template = {
  "apiVersion": "batch/v1",
  "kind": "Job",
  "spec": {
    "template": {
      "spec": {
        "restartPolicy": "Never",
        "volumes": [{"name": "repo", "emptyDir": {}}],
        "initContainers": [{
          "name": "git-clone",
          "image": "alpine/git:2.45.2",
          "env": [{
            "name": "GITHUB_TOKEN",
            "valueFrom": {"secretKeyRef": {"name": "github-credentials", "key": "github-access-token"}}
          }],
          "command": ["sh","-c"],
          "args": ["git clone --branch ${GIT_REF:-main} https://$GITHUB_TOKEN@github.com/your-org/your-repo.git /repo"],
          "volumeMounts": [{"name": "repo", "mountPath": "/repo"}]
        }],
        "containers": [{
          "name": "flow",
          "env": [{"name": "PYTHONPATH", "value": "/repo:${PYTHONPATH}"}],
          "volumeMounts": [{"name": "repo", "mountPath": "/repo"}]
        }]
      }
    }
  }
}
Keep the container name “flow” so the agent still injects its env. Notes - If you go with A or C, make sure your package layout is importable (has init.py where needed) and use absolute imports. - With Git storage, prefer a full repo URL; with GitHub storage, you’ll never see sibling modules because they’re simply not there. Docs, for posterity: - Prefect 1 Git storage - Prefect 1 GitHub storage
m
@Marvin GitHub method fetches only single file while Git fetches the whole repo?
m
thought for 23.8 seconds
Yes. In Prefect 1.x: - GitHub storage: fetches a single file via the GitHub API at runtime. It does not clone the repo, so sibling modules like
import tasks
won’t exist. Uses
access_token_secret="..."
. - Docs: GitHub storage - Git storage: performs a real
git clone
of the repo (at the specified ref), so the whole repo is present and imports work. Uses
git_token_secret="..."
. - Docs: Git storage So if you need intra-repo imports, switch to Git storage or bake your code into the image. Example switch:
Copy code
from prefect.storage import Git

storage = Git(
    repo="<https://github.com/your-org/your-repo.git>",
    flow_path="path/to/flow.py",
    ref="main",
    git_token_secret="GITHUB_ACCESS_TOKEN",  # resolves PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
)
Alternatively, keep GitHub storage but clone the repo yourself (initContainer + PYTHONPATH) or package your project into the image. But yes—the GitHub storage class is a single-file fetcher; Git clones the lot.
m
@Marvin Does Git method has ref parameter? I am getting TypeError: __init__() got an unexpected keyword argument 'ref'
m
thought for 130.7 seconds
Short answer: In Prefect 1.x,
Git
storage doesn’t take
ref
. Use the branch parameter. Try this:
Copy code
from prefect.storage import Git

storage = Git(
    repo="<https://github.com/your-org/your-repo.git>",
    flow_path=flow_file,              # path to the flow file in the repo
    branch="main",                    # not 'ref'
    git_token_secret="GITHUB_ACCESS_TOKEN",  # resolves PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
)
Notes: - In some older Prefect 1 versions the arg was
branch_name
instead of
branch
. If
branch
errors, try:
Copy code
storage = Git(..., branch_name="main", git_token_secret="GITHUB_ACCESS_TOKEN")
- Pinning a specific commit isn’t supported by the
Git
storage API; use a tag/branch that points to it, or bake the code into the image, or do a custom checkout via an initContainer. Docs, such as they are: - Prefect 1.x Git storage
m
@Marvin I moved to Git storage method and I am getting - Failed to load and execute flow run: NotGitRepository() error. How to fix that?
m
thought for 55.6 seconds
Yes, that error is classic GitPython complaining it can’t find a real repo. With Prefect 1’s Git storage, here’s what to check: 1) Use a remote URL, not a local path - If you pass a local filesystem path to
repo=...
, the flow pod will try to open it as a repo and you’ll get
NotGitRepository()
. - Use an HTTPS (or SSH) URL. Prefer HTTPS with a token:
Copy code
from prefect.storage import Git

storage = Git(
    repo="<https://github.com/your-org/your-repo.git>",
    flow_path="path/to/flow.py",     # required; path to your flow in the repo
    branch="main",                   # or your branch/tag
    git_token_secret="GITHUB_ACCESS_TOKEN",
)
If
branch
errors on your Prefect version, try
branch_name="main"
. 2) Ensure git and GitPython are in your flow-run image - The runtime container needs both the git CLI and the GitPython library (Prefect 1 uses GitPython which shells out to git). - Dockerfile snippet:
Copy code
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*
RUN pip install gitpython
Quick check in a flow pod:
Copy code
git --version
python -c "import git, sys; print('gitpython ok', git.__version__)"
3) Token and env are present in the pod - You already set
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
via Helm. Verify at runtime:
Copy code
kubectl exec <flow-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
4)
flow_path
must be correct - It’s the path to the flow file within the repo at that branch/tag. If it doesn’t exist at that ref, loading fails (different error, but worth verifying). 5) URL formatting - Use a full URL with scheme. This will fail:
Copy code
repo="<http://github.com/your-org/your-repo|github.com/your-org/your-repo>"   # looks like a local path to GitPython
- Use:
Copy code
repo="<https://github.com/your-org/your-repo.git>"
SSH works too, but then you need keys/known_hosts instead of the token. If you still get NotGitRepository after these fixes, please share: - Your exact
Git(...)
snippet (repo, branch/branch_name, flow_path) -
git --version
and GitPython import check from inside a flow pod - The repo URL you’re passing (sanitized is fine) Docs, for light reading: - Prefect 1.x Git storage - Prefect 1.x GitHub storage
m
I have: storage = Git( repo=git_repo, flow_path=flow_file, branch_name=git_ref, git_token_secret_name="GITHUB_ACCESS_TOKEN" ) job_template = { "apiVersion": "batch/v1", "kind": "Job", "spec": { "template": { "spec": { "nodeSelector": { "kubernetes.io/arch": "amd64" }, "restartPolicy": "Never", "containers": [{ "name": "flow", "env": [ { "name": "PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN", "valueFrom": { "secretKeyRef": { "name": "github-credentials", "key": "github-access-token", } } } ] }] } } } } project_name: description: 'Prefect Project name' required: true default: '' git_ref: description: 'Git reference (branch or commit) for the flow' required: true default: 'main' labels: description: 'Labels for the flow (comma-separated)' required: false default: 'ea-data-manufacturing' jobs: register-flows: runs-on: enverus-ubuntu steps: - name: Checkout code uses: actions/checkout@v4 with: fetch-depth: 0 token: ${{ secrets.PAT }} - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.8' - name: Install dependencies run: | pip install -r requirements.txt - name: Register specific flow env: PREFECT_SERVER_URL: https://ea-prefectv1-apollo-dev.int.enverus.com/graphql PREFECT__CLOUD__API: https://ea-prefectv1-apollo-dev.int.enverus.com/graphql PREFECT__BACKEND: server PREFECT__CLOUD__AUTH_TOKEN: "" PROJECT_NAME: ${{ github.event.inputs.project_name }} GIT_REF: ${{ github.event.inputs.git_ref }} FLOW_FILE: ${{ github.event.inputs.flow_file }} LABELS: ${{ github.event.inputs.labels }} Can you see problem there?
@Marvin I have: storage = Git( repo=git_repo, flow_path=flow_file, branch_name=git_ref, git_token_secret_name="GITHUB_ACCESS_TOKEN" ) job_template = { "apiVersion": "batch/v1", "kind": "Job", "spec": { "template": { "spec": { "nodeSelector": { "kubernetes.io/arch": "amd64" }, "restartPolicy": "Never", "containers": [{ "name": "flow", "env": [ { "name": "PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN", "valueFrom": { "secretKeyRef": { "name": "github-credentials", "key": "github-access-token", } } } ] }] } } } } project_name: description: 'Prefect Project name' required: true default: '' git_ref: description: 'Git reference (branch or commit) for the flow' required: true default: 'main' labels: description: 'Labels for the flow (comma-separated)' required: false default: 'ea-data-manufacturing' jobs: register-flows: runs-on: enverus-ubuntu steps: - name: Checkout code uses: actions/checkout@v4 with: fetch-depth: 0 token: ${{ secrets.PAT }} - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.8' - name: Install dependencies run: | pip install -r requirements.txt - name: Register specific flow env: PREFECT_SERVER_URL: https://ea-prefectv1-apollo-dev.int.enverus.com/graphql PREFECT__CLOUD__API: https://ea-prefectv1-apollo-dev.int.enverus.com/graphql PREFECT__BACKEND: server PREFECT__CLOUD__AUTH_TOKEN: "" PROJECT_NAME: ${{ github.event.inputs.project_name }} GIT_REF: ${{ github.event.inputs.git_ref }} FLOW_FILE: ${{ github.event.inputs.flow_file }} LABELS: ${{ github.event.inputs.labels }} Can you see problem there?
m
thought for 30.6 seconds
Yep — a few gotchas in there. What to fix - Use a full Git URL for Git storage - Git storage expects a cloneable URL, not "owner/repo". - Example:
<https://github.com/your-org/your-repo.git>
(or SSH if you’ve set up keys) - Use the correct parameter names -
branch
(not
branch_name
) -
git_token_secret
(not
git_token_secret_name
) - Example:
Copy code
from prefect.storage import Git

storage = Git(
    repo="<https://github.com/your-org/your-repo.git>",
    flow_path=flow_file,                 # path to the flow file within the repo
    branch=git_ref,                      # branch/tag; SHA may not be supported in all versions
    git_token_secret="GITHUB_ACCESS_TOKEN",
)
- Ensure the flow-run image has git and GitPython - NotGitRepository() usually means the runtime doesn’t have a proper git environment. - In your Docker image:
Copy code
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*
RUN pip install gitpython
- Sanity check in a flow pod:
Copy code
git --version
python -c "import git; print('gitpython ok')"
- Keep the container name “flow” in your job template - You did — good. The agent injects env into the container named
flow
. - Don’t add an
env: []
that wipes out the agent’s merged env. If you include
env
, make sure your secret var is present (you did). Quick validation steps - After a run starts: - Verify the secret env is present: -
kubectl exec <flow-pod> -- printenv | grep PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
- Confirm repo URL is correct and clone succeeds: -
kubectl exec <flow-pod> -- sh -c 'git --version && python -c "import git; print(1)"'
If you still hit NotGitRepository after these changes, send: - The exact
repo=
value you’re using (sanitized is fine) - The output of the git/GitPython checks from the flow pod - Your flow-run image name Docs, for completeness: - Prefect 1.x Git storage
m
@Marvin I will be using: storage = GitHub( How can I make the deployment see the rest of the repo?
m
thought for 54.4 seconds
Short answer: Prefect 1’s GitHub storage only fetches the single flow file. If you want imports like
from tasks import …
to work, you must make the rest of the repo available on PYTHONPATH at runtime. You’ve got three sane options. Pick one: 1) Bake your code into the image (recommended) - Build an image that already contains your repo (installed as a package or copied into the image). Then GitHub storage just loads the flow file, and your imports resolve from site-packages or PYTHONPATH. - Dockerfile sketch:
Copy code
FROM prefecthq/prefect:1.4-python3.8  # or your base
WORKDIR /app
COPY . /app
RUN pip install -U pip && pip install .
# or: RUN pip install -e .
# If you didn’t package it, at least expose it
ENV PYTHONPATH=/app:${PYTHONPATH}
- Keep using GitHub storage in your flow; no changes needed besides using absolute imports. 2) Clone the repo with an initContainer and set PYTHONPATH - Keep GitHub storage. Add a K8s initContainer that clones the repo into a shared volume; mount it and set PYTHONPATH for the “flow” container. Example job_template (keep container name “flow”):
Copy code
job_template = {
  "apiVersion": "batch/v1",
  "kind": "Job",
  "spec": {
    "template": {
      "spec": {
        "restartPolicy": "Never",
        "volumes": [{"name": "repo", "emptyDir": {}}],
        "initContainers": [{
          "name": "git-clone",
          "image": "alpine/git:2.45.2",
          "env": [
            {
              "name": "GITHUB_TOKEN",
              "valueFrom": {
                "secretKeyRef": {"name": "github-credentials", "key": "github-access-token"}
              }
            },
            {"name": "GIT_REF", "value": "main"}  # or pass your ref via env
          ],
          "command": ["sh","-c"],
          "args": [
            "git clone --depth 1 --branch \"$GIT_REF\" "
            "https://$GITHUB_TOKEN@github.com/your-org/your-repo.git /repo"
          ],
          "volumeMounts": [{"name": "repo", "mountPath": "/repo"}]
        }],
        "containers": [{
          "name": "flow",
          "env": [{"name": "PYTHONPATH", "value": "/repo:${PYTHONPATH}"}],
          "volumeMounts": [{"name": "repo", "mountPath": "/repo"}]
        }]
      }
    }
  }
}
- Your existing secret env for Prefect (
PREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
) can stay; the initContainer uses the k8s Secret directly. 3) Bootstrap in the flow file (no initContainer, but needs git or tar extraction) - At the very top of your flow file (fetched by GitHub storage), before importing your modules, clone or download the repo and add it to sys.path. - Git option (requires git in the image):
Copy code
import os, sys, subprocess

def _ensure_repo():
    repo_dir = "/tmp/repo"
    if not os.path.exists(os.path.join(repo_dir, ".git")):
        from prefect import Secret
        token = Secret("GITHUB_ACCESS_TOKEN").get()
        ref = os.getenv("GIT_REF", "main")
        url = f"https://{token}:x-oauth-basic@github.com/your-org/your-repo.git"
        subprocess.check_call(["git", "clone", "--depth", "1", "--branch", ref, url, repo_dir])
    if repo_dir not in sys.path:
        sys.path.insert(0, repo_dir)

_ensure_repo()

from tasks import something
- No-git option: download and extract a tarball from the GitHub API using requests/tarfile, then add the extracted folder to sys.path. Gotchas to avoid - Don’t expect GitHub storage to ever include sibling files; it won’t. - If you use a custom job template, keep the main container named “flow” so Prefect still injects its env. - Make sure your repo is importable (package layout and init.py where needed) and use absolute imports in your flow. If you want, I can plug your org/repo names and secret keys into the initContainer template so you can paste it in and be on your way. Refs you didn’t want but might need: - Prefect 1.x GitHub storage
m
@Marvin I have baked git into flow image but I am still getting Failed to load and execute flow run: NotGitRepository() with Git( storage method. What can I do to fix that?
m
thought for 198.5 seconds
Short version: installing git in the image doesn’t make your code a Git repository. NotGitRepository() means the directory Prefect is looking at does not contain a
.git
folder. A few ways to fix it, depending on what you’re trying to do: - If you’re on Prefect 3.x (recommended): - Don’t use a Git storage block. It’s removed in 3.x. Deploy from source instead:
Copy code
from prefect import flow

    @flow
    def my_flow(): ...

    if __name__ == "__main__":
        (
            my_flow
            .from_source(
                source="<https://github.com/OWNER/REPO>",
                entrypoint="path/to/flow.py:my_flow"
            )
            .deploy(name="prod", work_pool_name="your-pool")
        )
- This clones your repo at run time. No need to bake
.git
into the image. Docs: Store flow code in deployments - If you’re on Prefect 2.x and intentionally using a Git storage block: - Let Prefect clone the repo at runtime; don’t bake the repo into the image. - Ensure the
git
binary is installed in the runtime image and your Git credentials are set on the block. - If you insist on baking the repo into the image, you must include the
.git
directory: - Remove
.git
from
.dockerignore
if it’s there. - Or clone during the build:
RUN git clone --branch <branch> <url> /opt/flows
- Verify inside the container that
/opt/flows/.git
exists and your deployment’s working directory points there. - The NotGitRepository error occurs when GitPython is asked to operate on a path that doesn’t contain
.git
(e.g., you copied only the working tree or used `git archive`/GitHub ZIP). Quick checklist: - Confirm your Prefect version (2.x vs 3.x). - If 3.x: switch to
flow.from_source(...)
and drop the Git storage block. - If 2.x with Git storage: don’t combine it with “baked” code; let Prefect clone. Make sure
git
is installed and credentials are configured. - If you must bake code: ensure
.git
is present in the path Prefect uses, and that you’re not accidentally pointing Prefect at a directory without
.git
. If you share: - Prefect version, - how you’re creating the deployment (code/CLI snippet), - and the relevant Dockerfile bits, I can point at the exact line causing the sulk.
m
@Marvin I am selfhosting prefect V1
m
thought for 125.5 seconds
Got it — Prefect v1. The NotGitRepository() tells us Prefect tried to treat a directory as a git repo but there’s no
.git
folder there. Installing the git binary in your image doesn’t create a repo; the metadata must exist where Prefect expects it. Your options (pick one, don’t mix them): 1) Use Git storage correctly (clone from a remote at runtime) - Configure
prefect.storage.Git
with a remote URL (
https://...git
or
git@...
), the flow file path, and a `branch`/`ref`. - Ensure credentials are available (token/secret for HTTPS or SSH key). - Make sure git is installed in the runtime image (you did), and the agent/container has network access. - This avoids needing a
.git
directory baked into the image. - Docs: prefect.storage.Git 2) Don’t use Git storage; bake your code into the image - Switch to
prefect.storage.Docker
and copy your flow code into the image. No git interaction at runtime, no
.git
needed. - Docs: prefect.storage.Docker 3) If you insist on a local git repo inside the image with Git storage - You must include the
.git
directory in the image: - Remove
.git
from
.dockerignore
, or - Clone during the build:
Copy code
RUN git clone --branch main --depth 1 <https://github.com/OWNER/REPO.git> /opt/flow
    WORKDIR /opt/flow
- Point
Git(repo="/opt/flow", flow_path="path/to/flow.py", branch="main")
at that path. - Verify inside the running container:
Copy code
pwd && ls -a
  # Expect to see .git in the directory Prefect is using
Common pitfalls causing NotGitRepository(): - Using Git storage but copying only the working tree into the image (no
.git
due to .dockerignore). - Pointing
repo
at a local path that isn’t actually a git repo. - Private repo without credentials (clone fails or falls back weirdly). If you share your
flow.storage = Git(...)
snippet and the relevant Dockerfile bits, I’ll point at the exact line making Git unhappy. In the meantime, if you just want it to run, switching to Docker storage is the lowest-friction path in v1.
m
@Marvin I managed to maek the storage wotk, but now I am getting this error: Failed to load and execute flow run: TypeError("__init__() got an unexpected keyword argument 'api_token'") What might be the issue?
m
thought for 75.0 seconds
That error means you’re passing an
api_token
kwarg to something in Prefect v1 that doesn’t accept it. Since you just switched to Git storage, the usual culprits are: - Using
api_token=
on a storage class that expects a different kwarg - For GitHub storage, use
access_token_secret
, not `api_token`:
Copy code
from prefect.storage import GitHub

    flow.storage = GitHub(
        repo="OWNER/REPO",
        path="path/to/flow.py",
        ref="main",
        access_token_secret="GITHUB_ACCESS_TOKEN"  # name of a Prefect Secret
    )
Docs: prefect.storage.GitHub - For generic Git storage, use
git_token_secret_name
(or SSH):
Copy code
from prefect.storage import Git

    flow.storage = Git(
        repo="<https://github.com/OWNER/REPO.git>",
        flow_path="path/to/flow.py",
        ref="main",
        git_token_secret_name="GIT_TOKEN"  # name of a Prefect Secret
        # or: use_ssh=True and mount an SSH key instead
    )
Docs: prefect.storage.Git - Passing
api_token
to a Client/Agent while you’re on self-hosted Server - Server doesn’t use Cloud API tokens. Make sure you’re not doing
Client(api_token=...)
or exporting Cloud env vars. Set the backend to Server:
Copy code
prefect backend server
Then remove any
PREFECT__CLOUD__*
env vars. - Version mismatch between registration and runtime - If the flow was registered on one Prefect v1 version and the agent runs another, a serialized field like
api_token
can cause this TypeError at load time. Re-register the flow with the same Prefect version your agent/executor uses. If you paste: - your
flow.storage = ...
snippet, - any `Client(...)`/Agent config, - and the stack trace head, I’ll point to the exact kwarg that needs renaming or removing.
m
@Marvin The problem will be in this code: import prefect from prefect.engine import result from prefect.utilities.graphql import GraphQLResult from datetime import datetime logger = prefect.context.logger CLOUD_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z" class GraphQLCloudAPI(object): """ Wrapper around
prefect.Client
""" client: prefect.Client def __init__(self) -> None: cloud = prefect.config.cloud self.client = prefect.Client(api_server=cloud.api, api_token=cloud.auth_token) def get_project_by_run_id(self, flow_run_id: str) -> GraphQLResult: query = """ query($flow_run_id: uuid) { flow_run (where: {id: {_eq: $flow_run_id}}){ flow{ project { name } } } } """ variables = {"flow_run_id": flow_run_id} result = self.client.graphql(query, variables=variables) project = result.data.flow_run[0].flow.project logger.info(f"project='{project}'") return project def get_task_by_run_id(self, task_run_id: str) -> GraphQLResult: query = """ query($task_run_id: uuid) { task_run (where: {id: {_eq: $task_run_id}}){ task { name } } } """ result = self.client.graphql(query, variables={"task_run_id": task_run_id}) logger.info(f"_get_task_by_id graphQL result: {result}") task = result.data.task_run[0].task logger.info(f"task='{task}'") return task def get_flow_run_start(self, flow_run_id: str) -> datetime: """ Returns: - float: timestamp (flow_run.start_time) """ query = """ query($flow_run_id: uuid){ flow_run(where{id{_eq: $flow_run_id}}){ start_time, } } """ variables = {"flow_run_id": flow_run_id} result = self.client.graphql(query, variables=variables) start_time = result.data.flow_run[0].start_time start_time = datetime.strptime(start_time, CLOUD_DATETIME_FORMAT) return start_time DO you see a problem somewhere with prefect.client ?
m
thought for 69.0 seconds
Yes — two problems, both around Client and config: 1) You’re self‑hosting Prefect v1 (Server), but you’re reading cloud config and passing an auth token: -
cloud = prefect.config.cloud
is for Prefect Cloud, not Server. - Server doesn’t use an API token for GraphQL. 2) Your Prefect v1 Client doesn’t accept an
api_token
kwarg, hence the TypeError. Fix for self‑hosted Server: - Don’t pass
api_token
. - Either let Client read the configured endpoint, or pass the Server GraphQL URL explicitly. Example:
Copy code
from prefect.client import Client
import prefect

class GraphQLCloudAPI:
    def __init__(self) -> None:
        # Option A: rely on PREFECT__BACKEND=server and PREFECT__SERVER__ENDPOINT env vars
        self.client = Client()

        # Option B: set explicitly if you prefer
        # self.client = Client(api_server=prefect.config.server.endpoint)
If you were actually targeting Prefect Cloud (you said self‑hosted, but just in case): - Use the Cloud backend and either set the token via config/env (recommended) and do
Client()
, or pass the kwarg that your version supports (some v1 releases used
api_key
, not
api_token
). Quick sanity checks: - Ensure the process has
PREFECT__BACKEND=server
and
PREFECT__SERVER__ENDPOINT=http://<host>:4200/graphql
. - Avoid reading
prefect.config.cloud.*
when using Server. - If you must support both, branch on
prefect.config.backend
and only include a token when
backend == "cloud"
, using the parameter name your version supports. If you share your Prefect v1 exact version (
prefect.__version__
), I can give the precise Client signature for it.
m
@Marvin I am targeting self hosted. If i set it exlpicitly with self.client = Client(api_server=prefect.config.server.endpoint) do I need to configure specifically any env variables or is this the final code?
m
thought for 35.0 seconds
Short answer: if you pass a correct, reachable GraphQL URL to Client, you don’t need any extra env vars. Just don’t pass an API token on self-hosted. Examples: - Hardcode the server endpoint (no env needed):
Copy code
from prefect.client import Client

self.client = Client(api_server="http://<your-server-host>:4200/graphql")
- Or read it from Prefect’s server config (only works if that value is correct in your environment):
Copy code
from prefect.client import Client
import prefect

self.client = Client(api_server=prefect.config.server.endpoint)
Note: the default is often http://localhost:4200/graphql. If your code runs in a container, “localhost” will point to the container itself, not the Prefect Server. Use a hostname/IP that’s reachable from where the flow runs (e.g., a docker-compose service name, k8s service DNS, or host.docker.internal on Mac/Windows). Optional, if you prefer zero code config: - Set env vars and let Client() pick them up:
Copy code
PREFECT__BACKEND=server
PREFECT__SERVER__ENDPOINT=http://<your-server-host>:4200/graphql
Then:
Copy code
self.client = Client()
Also make sure you’ve not set any PREFECT__CLOUD__* env vars, and remove any api_token arguments entirely—Server doesn’t use them. Client docs for v1, if you need them: https://docs.prefect.io/api/latest/client/client/#client
m
@Marvin What causes this error? Failed to retrieve task state with error: ClientError([{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}]) Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 157, in initialize_run map_index=context.get("map_index"), File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1498, in get_task_run_info result = self.graphql(mutation) # type: Any File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 465, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}]
m
thought for 55.5 seconds
That GraphQL error means Prefect sent an empty string where a required UUID (flow_run_id or task_id) was expected. In plain terms: the runtime didn’t have a proper flow/task identity when it tried to create/fetch the task run. Common causes in Prefect v1 (self-hosted): - Running the flow outside a flow run context (e.g., calling
flow.run()
directly) while using the Cloud/Server engine. The Cloud task runner expects a valid
flow_run_id
. - Manually invoking the execution entrypoint without providing a flow run ID (e.g.,
prefect execute flow-run
without
--flow-run-id
). - Flow registered under one graph, but the code running on the agent is different (tasks added/removed/renamed) and not re-registered, so the backend can’t match tasks → empty
task_id
. - Version mismatch between registration and runtime (serialized metadata doesn’t match). How to fix: - Ensure runs are started via the Server and picked up by an agent: 1) Register the flow (with the exact code the agent will run). 2) Start a flow run from the UI/API. 3) Let the agent execute it (the agent injects the proper
flow_run_id
). - If you’re invoking the runner manually, pass the flow run id:
Copy code
prefect execute flow-run --flow-run-id <UUID>
- Re-register the flow after any code changes that affect the task graph. - Keep the Prefect version consistent between “registration” and “agent/runtime” containers. Quick diagnostics: - Inside a running task, print:
Copy code
import prefect
  logger = prefect.context.logger
  <http://logger.info|logger.info>(f"flow_run_id={prefect.context.get('flow_run_id')}")
If that’s None/empty, you’re not executing inside a proper flow run. - Verify the agent logs show it picked up the run you started. - Confirm your storage points to the same commit/branch that you registered. If you share how you’re launching runs (agent type/command) and whether you’re calling
flow.run()
anywhere, I can pinpoint the exact step that’s dropping the UUID on the floor.
m
@Marvin This is the log error:
Expected type UUID!, found ""; Could not parse UUID:
GraphQL request:2:101
2 |   get_or_create_task_run_info(input: {flow_run_id: "f4274061-3915-4f7e-bea2-713d
| 61a2f387", task_id: "", map_index: -1}) {
|                     ^
Traceback (most recent call last):
File "/prefect-server/src/prefect_server/graphql/scalars.py", line 77, in uuid_value_parser
return str(uuid.UUID(value))
File "/usr/local/lib/python3.7/uuid.py", line 160, in __init__
raise ValueError('badly formed hexadecimal UUID string')
ValueError: badly formed hexadecimal UUID string
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/graphql/validation/rules/values_of_correct_type.py", line 185, in is_valid_scalar
parse_result = type_.parse_literal(node)
File "/prefect-server/src/prefect_server/graphql/scalars.py", line 86, in uuid_literal_parser
return uuid_value_parser(ast.value)
File "/prefect-server/src/prefect_server/graphql/scalars.py", line 79, in uuid_value_parser
raise ValueError("Could not parse UUID: {}".format(value))
ValueError: Could not parse UUID:
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/graphql/validation/rules/values_of_correct_type.py", line 185, in is_valid_scalar
parse_result = type_.parse_literal(node)
File "/prefect-server/src/prefect_server/graphql/scalars.py", line 86, in uuid_literal_parser
return uuid_value_parser(ast.value)
File "/prefect-server/src/prefect_server/graphql/scalars.py", line 79, in uuid_value_parser
raise ValueError("Could not parse UUID: {}".format(value))
graphql.error.graphql_error.GraphQLError: Expected type UUID!, found ""; Could not parse UUID:
GraphQL request:2:101
2 |   get_or_create_task_run_info(input: {flow_run_id: "f4274061-3915-4f7e-bea2-713d
| 61a2f387", task_id: "", map_index: -1}) {
DOes it help find where is it dropping the ids?
m
thought for 76.8 seconds
Yes — that log nails it. The server is receiving a valid flow_run_id but an empty task_id:
task_id: ""
. That means the engine is trying to start a task that the backend doesn’t know about. Most common causes in Prefect v1 Server: - Code drift between registration and runtime - You registered the flow with one version of the code, but the agent is executing a different commit/branch. The registered flow doesn’t contain this task, so there’s no backend task UUID to send → empty string. - Fix: pin your storage to a commit SHA (not a branch), re-register, and make sure the agent pulls the same ref. Check agent logs for “Loading flow from … ref=…”. - Dynamic task creation at runtime - Creating tasks inside the flow function or conditionally based on params means they weren’t part of the graph at registration time, so they have no backend IDs. - Fix: define all @task functions and task instances at import time; don’t create new Task objects during a run. Re-register. - Running outside a proper Server-run context - Calling
flow.run()
or invoking the engine manually while still using the Cloud/Server task runner can cause missing IDs. - Fix: start runs via the Server UI/API and let an agent pick them up. - Version mismatch - Different Prefect versions between registration and the agent can lead to serialization mismatches. - Fix: align versions and re-register. Quick checks you can do now: - Re-register the flow with the exact code the agent will run, pinning Git storage
ref
to a commit SHA. Trigger a new run. - In agent logs, confirm it’s loading the same storage ref/commit you registered. - Verify your flow doesn’t create tasks dynamically inside the flow body. - Optional GraphQL sanity check (against your Server):
Copy code
query($fr: uuid!) {
    flow_run(where: {id: {_eq: $fr}}) {
      flow {
        id
        name
        tasks { id name slug }
      }
    }
  }
If the task you expect isn’t listed (or lacks an id), it wasn’t part of the registered graph. If you share: - how your storage is configured (URL + branch/ref), - your registration step, - and where tasks are defined/created in the flow, I’ll point to the exact mismatch.
m
@Marvin These 3 files have all the code that is doing everything. Can you analyze it and check for errors?
m
thought for 22.5 seconds
I can, but I’ll need the actual files. Please paste the contents (or share a repo/gist link). Use code fences and include filenames so I can follow imports, e.g.:
Copy code
# flows/my_flow.py
<code>

# storage.py
<code>

# register.py
<code>
Before you paste, please redact any tokens/URLs. Since your error shows
task_id: ""
, I’ll be looking for: - Tasks created dynamically inside the flow body (must be defined at import time in Prefect v1). - Mismatch between registered code and runtime code (storage pointing at a branch vs a pinned commit). - Calling
flow.run()
anywhere (should run via Server + agent). - Any custom
Task(...)
constructed inside functions or conditionals. - Prefect v1 version mismatches between the machine that registers and the agent. Helpful context to include: - Your Prefect v1 version (
python -c "import prefect; print(prefect.__version__)"
). - How you register the flow (code or command). - Your storage config (Git/Docker; repo URL + branch/commit). - Agent type/command and logs showing the ref/commit it pulled. If you prefer a quick sanity check while you prep files: re-register the flow pinned to a commit SHA, then trigger a run and confirm the agent logs show it loaded that exact SHA. That alone fixes most empty
task_id
issues.
m
@Marvin this is the code of the flow import os import sys flow_dir = os.path.dirname(os.path.abspath(file)) repo_root = os.path.abspath(os.path.join(flow_dir, "..", "..")) if repo_root not in sys.path: sys.path.insert(0, repo_root) import click import prefect from prefect import task, Flow, Parameter, unmapped from prefect.executors import LocalDaskExecutor from prefect.triggers import all_finished, any_failed from tasks.state_handlers.trigger_fail import trigger_failed_to_skipped # Artifacts from prefect.artifacts import create_markdown # DSC Tasks from tasks.dsc import GetDSCDatasetSpec, PutDSCDatasetInstance, PollDSCCaptureState # DSM Tasks from tasks.dsm import ( GenerateBaseDSMEntity, GetDSMDatasetCurrentState, CaptureCurrentCacheState, PutDSMDatasetInstance, PollDSMMatState, PrepublishDSMDataset, PollDSMPrePublishState, PublishDSMDataset, CleanDSMDatasetInstances, VerifyLoadCounts, StopDSMDatasetInstanceMaterialization, ) from uuid import uuid4 # Get Prefect logger from context logger = prefect.context.get("logger") from tasks.create_artifact import create_artifact # Jinja from prefect.tasks.templates import JinjaTemplate jinja_template = """ # Capture Capture Type | Schema | Table | Capture SQL --- | --- | --- | --- {% for spec in dsc_specs %}{{spec.cdcType}} | {{spec.captureSource.dbSchema}} | {{spec.captureSource.dbTableName}} | {{spec.captureSource.sql}} {% endfor %} {% for state in dsc_states %} Capture State: {{state.state}}, Capture Count: {{state.capturedRecords}} {% endfor %} # Materialization dataSetId | instanceId | state --- | --- | --- {% for instance in dsm_published_instances %}{{instance.dataSetId}} | {{instance.instanceId}} | {{instance.state}} {% endfor %} # Published caches {% for dataset in dsm_published_caches %} specId | type | entity name | entity count --- | --- | --- | --- {% for cache in dataset.caches %}{{cache.specId}} | {{cache.type}} | {{cache.cache_entity_name}} | {{cache.count}} {% endfor %} {% endfor %} """ from tasks.email import NotifyFailureEmail from tasks.prepare_flow import CheckIfAlreadyRunnning check_if_running = CheckIfAlreadyRunnning() notify_failure = NotifyFailureEmail( trigger=any_failed, state_handlers=[trigger_failed_to_skipped], smtp_server="smtp.office365.com", smtp_port=587, smtp_type="STARTTLS", ) # Instantiate Jinja tasks jinja_task = JinjaTemplate(template=jinja_template, name="CreateJinjaMarkdown") # DSC tasks get_dsc_spec = GetDSCDatasetSpec(task_run_name="get_dsc_dataset_spec") put_dsc_dataset_instance = PutDSCDatasetInstance( task_run_name="put_dsc_dataset_instance" ) poll_dsc_capture_state = PollDSCCaptureState(task_run_name="poll_dsc_capture_state") # DSM tasks create_dsm_entity = GenerateBaseDSMEntity(task_run_name="get_dsm_dataset_spec") get_dsm_current_state = GetDSMDatasetCurrentState(task_run_name="get_dsm_current_state") capture_current_cache_state = CaptureCurrentCacheState( task_run_name="capture_current_state" ) put_dsm_dataset_instance = PutDSMDatasetInstance( task_run_name="put_dsm_dataset_instance" ) poll_dsm_mat_state = PollDSMMatState(task_run_name="poll_dsm_mat_state") prepublish_dsm_dataset = PrepublishDSMDataset(task_run_name="prepublish_dsm_dataset") poll_dsm_prepublish_state = PollDSMPrePublishState( task_run_name="poll_dsm_prepublish_state" ) publish_dsm_instance = PublishDSMDataset(task_run_name="publish_dsm_instance") cleanup_old_dsm_instances = CleanDSMDatasetInstances( task_run_name="cleanup_old_dsm_instances", trigger=all_finished, ) validate_caches = VerifyLoadCounts(task_run_name="verify load counts") stop_dsm = StopDSMDatasetInstanceMaterialization( task_run_name="stop_dsm", trigger=any_failed, state_handlers=[trigger_failed_to_skipped], ) @task(skip_on_upstream_skip=False) def generate_load_id() -> str: load_id = uuid4().__str__() return load_id # Parameters one_dataset = [{"name": "additivesummaryheader", "version": "v1.0.0"}] both_datasets = [ {"name": "additivesummaryheader", "version": "v1.0.0"}, {"name": "additivesummarydetail", "version": "v1.0.0"}, ] with Flow("additive-summary_v1") as flow: datasets = Parameter("datasets", default=both_datasets) variance = Parameter("verification_variance", default=10) max_active_runs = Parameter("max_active_runs", default=0) is_running = check_if_running(runs_limit=max_active_runs) load_id = generate_load_id(upstream_tasks=[is_running]) # DSC mapped_dsc_specs = get_dsc_spec.map(dataset=datasets) dsc_instance_uris = put_dsc_dataset_instance.map( spec=mapped_dsc_specs, load_id=unmapped(load_id) ) # DSM: We need to capture the current state of the caches base_entity = create_dsm_entity.map( dataset=datasets, upstream_tasks=[dsc_instance_uris] ) submited_entities = put_dsm_dataset_instance.map( base_dsm_entity=base_entity, load_id=unmapped(load_id), ) materialized_entities = poll_dsm_mat_state.map(base_dsm_entity=submited_entities) # DSC Poll here because prefect won't run two parallel map tasks mapped_dsc_states = poll_dsc_capture_state.map( dsc_instance_uri=dsc_instance_uris, upstream_tasks=[submited_entities] ) # DSM: We need to do pre-publish steps submited_prepublished_entities = prepublish_dsm_dataset.map( base_dsm_entity=materialized_entities, upstream_tasks=[mapped_dsc_states] ) prepublished_entities = poll_dsm_prepublish_state.map( base_dsm_entity=submited_prepublished_entities ) stop_dsm = stop_dsm.map( base_dsm_entity=materialized_entities, upstream_tasks=[mapped_dsc_states], ) # DSM: We need to do validation steps prepublished_entities = capture_current_cache_state.map( base_dsm_entity=prepublished_entities ) validated_prepublished_entities = validate_caches.map( base_dsm_entity=prepublished_entities, vefiry_records_thresholds_enabled=unmapped(True), variance=unmapped(variance), ) # DSM: We need to do the publish steps published_entities = publish_dsm_instance( base_dsm_entities=validated_prepublished_entities, ) # DSM: Clean up old instances cleaned_entity = cleanup_old_dsm_instances.map( upstream_tasks=[published_entities], base_dsm_entity=base_entity, cleanup_days=unmapped(1), ) # # Create markdown artifact markdown = jinja_task( dsc_specs=mapped_dsc_specs, dsc_states=mapped_dsc_states, dsm_published_instances=published_entities, dsm_published_caches=prepublished_entities, ) create_artifact(markdown) send_email = notify_failure(upstream_tasks=[markdown]) # End # End # Executors flow.executor = LocalDaskExecutor() @click.command() @click.option( "--visualize", "-v", required=False, help="Visualize flow", flag_value="visualize" ) @click.option("--run", "-r", required=False, help="Run flow", flag_value="run") def main(visualize, run): if visualize: flow.visualize() elif run: flow.run() if name == "__main__": main() can you see anyhting there?
m
Your message was too long, here's your message at the allowed limit: this is the code of the flow import os import sys flow_dir = os path dirname(os path abspath(file)) repo_root = os path abspath(os path join(flow_dir )) if repo_root not in sys path sys path insert(0 repo_root) import click import prefect from prefect import task Flow Parameter unmapped from prefect executors import LocalDaskExecutor from prefect triggers import all_finished any_failed from tasks state_handlers trigger_fail import trigger_failed_to_skipped # Artifacts from prefect artifacts import create_markdown # DSC Tasks from tasks dsc import GetDSCDatasetSpec PutDSCDatasetInstance PollDSCCaptureState # DSM Tasks from tasks dsm import ( GenerateBaseDSMEntity GetDSMDatasetCurrentState CaptureCurrentCacheState PutDSMDatasetInstance PollDSMMatState PrepublishDSMDataset PollDSMPrePublishState PublishDSMDataset CleanDSMDatasetInstances VerifyLoadCounts StopDSMDatasetInstanceMaterialization ) from uuid import uuid4 # Get Prefect logger from context logger = prefect context get( logger ) from tasks create_artifact import create_artifact # Jinja from prefect tasks templates import JinjaTemplate jinja_template = # Capture Capture Type | Schema | Table | Capture SQL --- | --- | --- | --- {% for spec in dsc_specs %}{{spec cdcType}} | {{spec captureSource dbSchema}} | {{spec captureSource dbTableName}} | {{spec captureSource sql}} {% endfor %} {% for state in dsc_states %} Capture State {{state state}} Capture Count {{state capturedRecords}} {% endfor %} # Materialization dataSetId | instanceId | state --- | --- | --- {% for instance in dsm_published_instances %}{{instance dataSetId}} | {{instance instanceId}} | {{instance state}} {% endfor %} # Published caches {% for dataset in dsm_published_caches %} specId | type | entity name | entity count --- | --- | --- | --- {% for cache in dataset caches %}{{cache specId}} | {{cache type}} | {{cache cache_entity_name}} | {{cache count}} {% endfor %} {% endfor %} from tasks email import NotifyFailureEmail from tasks prepare_flow import CheckIfAlreadyRunnning check_if_running = CheckIfAlreadyRunnning() notify_failure = NotifyFailureEmail( trigger=any_failed state_handlers=[trigger_failed_to_skipped] smtp_server= <http //smtp office365 com|smtp office365 com> smtp_port=587 smtp_type= STARTTLS ) # Instantiate Jinja tasks jinja_task = JinjaTemplate(template=jinja_template name= CreateJinjaMarkdown ) # DSC tasks get_dsc_spec = GetDSCDatasetSpec(task_run_name= get_dsc_dataset_spec ) put_dsc_dataset_instance = PutDSCDatasetInstance( task_run_name= put_dsc_dataset_instance ) poll_dsc_capture_state = PollDSCCaptureState(task_run_name= poll_dsc_capture_state ) # DSM tasks create_dsm_entity = GenerateBaseDSMEntity(task_run_name= get_dsm_dataset_spec ) get_dsm_current_state = GetDSMDatasetCurrentState(task_run_name= get_dsm_current_state ) capture_current_cache_state = CaptureCurrentCacheState( task_run_name= capture_current_state ) put_dsm_dataset_instance = PutDSMDatasetInstance( task_run_name= put_dsm_dataset_instance ) poll_dsm_mat_state = PollDSMMatState(task_run_name= poll_dsm_mat_state ) prepublish_dsm_dataset = PrepublishDSMDataset(task_run_name= prepublish_dsm_dataset ) poll_dsm_prepublish_state = PollDSMPrePublishState( task_run_name= poll_dsm_prepublish_state ) publish_dsm_instance = PublishDSMDataset(task_run_name= publish_dsm_instance ) cleanup_old_dsm_instances = CleanDSMDatasetInstances( task_run_name= cleanup_old_dsm_instances trigger=all_finished ) validate_caches = VerifyLoadCounts(task_run_name= verify load counts ) stop_dsm = StopDSMDatasetInstanceMaterialization( task_run_name= stop_dsm trigger=any_failed state_handlers=[trigger_failed_to_skipped] ) @task(skip_on_upstream_skip=False) def generate_load_id() -> str load_id = uuid4() __str__() return load_id # Parameters one_dataset = [{ name additivesummaryheader version v1 0 0 }] both_datasets = [ { name additivesummaryheader version v1 0
0 } { name additivesummarydetail version v1 0 0 } ] with Flow( additive-summary_v1 ) as flow datasets = Parameter( datasets default=both_datasets) variance = Parameter( verification_variance default=10) max_active_runs = Parameter( max_active_runs default=0) is_running = check_if_running(runs_limit=max_active_runs) load_id = generate_load_id(upstream_tasks=[is_running]) # DSC mapped_dsc_specs = get_dsc_spec map(dataset=datasets) dsc_instance_uris = put_dsc_dataset_instance map( spec=mapped_dsc_specs load_id=unmapped(load_id) ) # DSM We need to capture the current state of the caches base_entity = create_dsm_entity map( dataset=datasets upstream_tasks=[dsc_instance_uris] ) submited_entities = put_dsm_dataset_instance map( base_dsm_entity=base_entity load_id=unmapped(load_id)
m
@Marvin I am loading tasks as modules in the flow, but the UUIDs are obviously somehow broken. The code was runnign fine´on cloud hosted version but does not work on self hosted. WHat can be the problem?
m
thought for 94.2 seconds
Short answer: the backend is getting a real flow_run_id but an empty task_id. That only happens when the task you’re trying to run does not exist (or cannot be matched) in the flow that was registered with the Server. Why it worked on Cloud but not Server - Version/schema mismatch: Your flow/agent code and your Prefect Server image are likely on different Prefect v1 versions. The client then sends a payload the Server can’t satisfy (empty ID instead of a resolved task UUID). - Code drift at runtime: You registered one DAG, but the agent is executing different code (branch moved, unpinned ref, different environment toggling parts of the DAG). - Dynamic/conditional task creation: Tasks created inside the flow function or under conditionals weren’t present at registration time, so the Server has no UUID for them at run time. - Unstable task identity: Reusing shared tasks from modules without stable names/slugs can lead to auto-renamed slugs at registration vs runtime, so the lookup fails. What to do (in order) 1) Align versions everywhere - Use the same Prefect v1 version for: - the machine/container that registers flows, - the agent/runtime image, - the Prefect Server stack (Docker image tag). - After aligning, re-register the flow. 2) Pin your flow code - If using Git storage, pin to a commit SHA, not a moving branch. Re-register with that SHA and confirm the agent logs show it pulled the same SHA for the run. 3) Make the DAG deterministic at import time - Define all tasks at module import, not inside the flow body or conditionals. - Avoid environment-driven DAG shape differences between registration and runtime. If you must vary behavior, keep the set of tasks the same and branch inside the tasks. 4) Stabilize task identity - Give tasks explicit names/slugs to avoid auto-renaming:
Copy code
from prefect import task, Flow

  @task(name="extract", slug="extract")
  def extract(): ...

  @task(name="transform", slug="transform")
  def transform(x): ...

  with Flow("my-flow") as flow:
      t = transform(extract())
- When reusing a shared task multiple times, copy it with unique name/slug per use:
Copy code
from my_lib.tasks import shared_task

  step1 = shared_task.copy(name="shared_step1", slug="shared_step1")
  step2 = shared_task.copy(name="shared_step2", slug="shared_step2")
Ensure slugs are unique within the flow. 5) Run via Server + agent, not flow.run() - Start runs from the UI/API and let the agent pick them up. Don’t call flow.run() when using the Cloud/Server task runner. Quick diagnostics - In a task, log context to verify identity resolution:
Copy code
import prefect
  from prefect.utilities.logging import get_logger
  logger = get_logger()

  @task
  def debug():
      ctx = prefect.context
      <http://logger.info|logger.info>(f"flow_run_id={ctx.get('flow_run_id')}, task_slug={ctx.get('task_slug')}, task_full_name={ctx.get('task_full_name')}")
- Query the Server to confirm the registered flow has the task you expect (tasks list should include the slug you see in logs). If you share: - prefect.version for registration, agent, and your Server image tag, - how you register (code) and your storage config (branch vs SHA), - where and how you import/wire those module tasks, I’ll point at the exact mismatch. Otherwise, standard cure is “align versions, pin code, re-register.” It’s not glamorous, but it works.
m
@Marvin I have this in my values yaml serverVersionTag: "2022.09.23" prefectVersionTag: "2022.09.23" uiVersionTag: "2022.09.23" But in each part of values yaml, ui, hasura, apollo etc I have specilised docker image build that have version 1.4.1 can this be the source of a missmatch?
m
thought for 39.8 seconds
Yes — that can absolutely be the source of the mismatch. In Prefect v1, the Python client/agent version and the Prefect Server stack (apollo, hasura, graphql, UI) must be kept in lockstep. Your values.yaml shows server/ui tags pinned to 2022.09.23, but you’ve built custom images for the server components that bundle Prefect 1.4.1. That’s mixing two release lines: - Prefect Server components are versioned by date tags like 2022.09.23. - The Python library (what your flows and agents import) is versioned as 1.x.y. Running a 1.4.1 client/agent against a 2022.09.23 server can introduce GraphQL/serialization differences. One symptom is exactly what you’re seeing: the client cannot resolve a task_id from the registered flow/tasks on the server, so it sends task_id: "" in get_or_create_task_run_info. What to do - Pick one coherent version set and align everything: - Option A (recommended): upgrade your Server stack to a release compatible with your Python client (1.4.1). Use a single server release tag for apollo/hasura/ui/graphql (don’t mix). Then re-register your flows. - Option B: downgrade your agents/flow runtime to the Prefect Python version that matches your existing server tag (likely ~1.2.x for 2022.09.23). Then re-register. - Do not override individual server sub-images with different tags/builds unless they’re built from the exact same server release. apollo/hasura/ui must all use the same 2022.xx.xx tag. - After aligning versions: - Re-register your flows with the exact code the agent will run (pin Git storage to a commit SHA, not a moving branch). - Ensure all tasks are defined at import time (no dynamic Task creation inside the flow body). - Give reused tasks explicit names/slugs if necessary. Quick checks - In your agent and flow runtime containers:
Copy code
python -c "import prefect; print(prefect.__version__)"
Confirm they match each other. - Confirm your server stack is one consistent tag (e.g., all 2022.11.xx). Mixing 2022.09.23 with anything else is asking for existential dread. - Verify the registered flow actually contains your tasks: - Start a fresh run after re-registering and check the agent logs; they should show the exact repo ref/commit and no task mapping errors. If you tell me which server tag you want to standardize on, I’ll suggest the matching Python version strategy (upgrade server to match 1.4.1, or downgrade client to match 2022.09.23) and the steps to roll it out cleanly.
m
@Marvin How to move everything to 1.4.1 ?
m
thought for 52.8 seconds
You can’t set “the server” to 1.4.1 — Prefect v1 has two version schemes: - Python package (your agents/flow code): 1.x.y (you want 1.4.1) - Server stack (apollo/hasura/graphql/ui): date tags like 2022.09.23 To “move everything to 1.4.1”, align the Python clients to 1.4.1 and run a Server release that matches it (the 2022.09.23 server tag is the right vintage for 1.4.1). Most importantly: do not mix and match different server sub-image versions. Practical checklist 1) Standardize the Server stack - Use a single, consistent server date tag for all components (e.g., 2022.09.23). - Remove any custom server images you built that bundle a different Prefect/Python version; use the official images from the chart for apollo/hasura/graphql/ui with the same date tag. - Backup your Postgres DB, then upgrade via your Helm chart: - Update values.yaml so serverVersionTag/uiVersionTag/prefectVersionTag all equal "2022.09.23" (or another single, consistent date tag). - Do not override per-component images with different tags. - helm upgrade --install … with the updated values. - After rollout, open the UI and confirm it shows the matching server build (bottom-left usually shows the date/version). 2) Standardize agents and flow runtime to Prefect 1.4.1 - Update your agent and flow images to install Prefect 1.4.1:
Copy code
RUN pip install "prefect==1.4.1"
- Restart agents and verify:
Copy code
python -c "import prefect; print(prefect.__version__)"  # should print 1.4.1
- Ensure you’re targeting Server, not Cloud:
Copy code
prefect backend server
  # Or ensure PREFECT__BACKEND=server and PREFECT__SERVER__ENDPOINT is set appropriately
3) Re-register flows with pinned code - Make sure the code the agent will execute matches what you register. - If you use Git storage, pin to a commit SHA (not a moving branch): - Git: set
ref
to the commit SHA. - GitHub storage: set
ref
likewise. - Re-register the flow(s), then trigger new runs via the UI/API. 4) Make the DAG deterministic and task IDs stable - Define tasks at import time, not inside the flow body or conditionals. - If you reuse task objects from modules, give them explicit names/slugs, or
.copy()
them with unique names/slugs per use. 5) Verify the fix - Agent logs should show it loaded the same repo ref/commit you registered. - The “task_id: ""” GraphQL errors should disappear. If not, query the flow’s tasks from the Server and confirm your expected task slugs are present. Common pitfalls to avoid - Mixing server sub-images (apollo/hasura/ui/graphql) with different tags or custom builds. - Registering with one code version but running another (moving branches). - Dynamic task creation changing the graph between registration and runtime. If you share your Helm chart reference and how you deploy agents (K8s/Docker/local), I can provide a concrete values.yaml example and agent rollout steps.
thought for 61.1 seconds
“prefecthq/server:core-1.4.1” is not the image you want for the v1 Server stack. That tag scheme is for the Python “core” package version, not the server components. In v1, the Server images (apollo, graphql, ui, hasura) use date-based tags and must all match. What to do: - Server components: pick a single server release tag and use it for every component. For Prefect Core 1.4.1, use the 2022.09.23 server release across the board. - If you need custom images, base each component off the matching upstream image for that date tag, e.g.:
Copy code
# apollo
    FROM prefecthq/apollo:2022.09.23
    # graphql
    FROM prefecthq/graphql:2022.09.23
    # ui
    FROM prefecthq/ui:2022.09.23
    # hasura (use the tag the 2022.09.23 chart expects)
    FROM hasura/graphql-engine:v1.3.3
- Do not mix these with any “core-1.4.1” tags, and don’t use different tags per component. - Agents and flow runtime: pin the Python package to 1.4.1.
Copy code
RUN pip install "prefect==1.4.1"
Verify in both the agent and runtime containers:
Copy code
python -c "import prefect; print(prefect.__version__)"  # should be 1.4.1
- Config: ensure you’re targeting Server, not Cloud. - Either set env vars
PREFECT__BACKEND=server
and
PREFECT__SERVER__ENDPOINT=http://<server>:4200/graphql
, or pass
Client(api_server=...)
in code (no token). - Re-register your flows after aligning versions and pin code to a commit SHA in Git storage. Start fresh runs via the UI/API and let the agent pick them up. Your values.yaml should not override per-component images with different tags. Keep serverVersionTag/uiVersionTag/etc all at the same date tag (e.g., "2022.09.23") and drop any custom “core-1.4.1” image overrides for those services. If you share: - the full values.yaml image overrides, and - the Dockerfiles for any customized server components, I’ll point out the exact lines to fix.
m
@Marvin I have serverVersionTag: "2022.09.23" prefectVersionTag: "2022.09.23" uiVersionTag: "2022.09.23" But dockerfiles loaded like this: FROM prefecthq/server:core-1.4.1 FROM prefecthq/ui:core-1.4.1 FROM prefecthq/apollo:core-1.4.1 Etc all componets 1.4.1. Is that missmatch? How do I match it? Do I need to do FROM prefecthq/servercore 2022.09.23? Is :2022.09.23 prefect v1? Can you tell me what versions of ui, apollo, hasura, server, prefect, /graphql-engine should I use to be consistent?
m
thought for 59.5 seconds
Yes, that’s a mismatch. In Prefect v1 there are two unrelated tag schemes: - Python client/agent (your flow runtime): 1.x.y (you want 1.4.1) - Server component images (ui, apollo, graphql/server): date-based tags like 2022.09.23 You cannot mix core-1.4.1 images with a values.yaml that pins serverVersionTag/uiVersionTag/prefectVersionTag to 2022.09.23. Pick one scheme and use it consistently. The Helm chart and official guidance use the date-based tags. So: - No:
prefecthq/server:core-:2022.09.23
(that tag doesn’t exist) - Yes: use the date tag on all server components Consistent set for the 1.4.1 era (recommended): - Server components (all the same date tag): -
prefecthq/server:2022.09.23
(GraphQL service) -
prefecthq/apollo:2022.09.23
-
prefecthq/ui:2022.09.23
-
hasura/graphql-engine:v1.3.3
- Agents and flow runtime (Python): -
pip install "prefect==1.4.1"
What to change: - In your Dockerfiles for server components, stop using
core-1.4.1
. Use:
Copy code
FROM prefecthq/server:2022.09.23      # for the GraphQL service
  FROM prefecthq/apollo:2022.09.23
  FROM prefecthq/ui:2022.09.23
  FROM hasura/graphql-engine:v1.3.3     # Hasura
- In Helm values, keep:
Copy code
serverVersionTag: "2022.09.23"
  prefectVersionTag: "2022.09.23"
  uiVersionTag: "2022.09.23"
And do not override individual component images with different tags. Then: - Upgrade/redeploy the Server stack (all pods should show the same 2022.09.23 tag). - Ensure agents/flow images run
prefect==1.4.1
. - Re-register your flows (pin Git storage to a commit SHA), then trigger new runs. This alignment removes the “task_id: ""” GraphQL mess. If it doesn’t, we’ll look next at dynamic task creation or slug mismatches—but version/tag drift is the usual culprit.
m
@Marvin FROM prefecthq/prefect:1.4.1 should be FROM prefecthq/prefect:2022.09.23 as well? and for hasura I have FROM hasura/graphql-engine:v2.0.1.cli-migrations-v2 , I need one that supports migration as well should I use FROM hasura/graphql-engine:vv1.3.3.cli-migrations-v2? can I leave the hasura one as it is?
m
thought for 34.0 seconds
Short answer: - Keep your agents/flow runtime on prefecthq/prefect:1.4.1. - Set ALL Prefect Server components (server/GraphQL, apollo, ui) to the same date tag, e.g. 2022.09.23. - Use Hasura v1.3.3 for that server release (the cli-migrations-v2 variant is fine). - Do not mix core-1.4.1 server images with 2022.09.23 tags. Concretely, a consistent set for the 1.4.1 era: - Server stack (all must match): - FROM prefecthq/server:2022.09.23 # GraphQL service - FROM prefecthq/apollo:2022.09.23 - FROM prefecthq/ui:2022.09.23 - FROM hasura/graphql-engine:v1.3.3.cli-migrations-v2 - Agents and flow runtime: - FROM prefecthq/prefect:1.4.1 (or any base + pip install prefect==1.4.1) Answers to your specific questions: - Should prefecthq/prefect:1.4.1 be :2022.09.23? No. The prefect image (Python client/agent) uses 1.x.y tags. Keep 1.4.1 for agents/flows. - Hasura v2.0.1.cli-migrations-v2 ok? Not with Prefect Server 2022.09.23. Use v1.3.3.cli-migrations-v2 to match that release. Leaving v2.0.1 will likely cause schema/metadata incompatibilities. After aligning: - Redeploy the server stack (verify all pods show 2022.09.23). - Ensure agents/flows report prefect.version == 1.4.1. - Re-register flows (pin Git storage to a commit SHA). - Run again; the empty task_id error should vanish. Tags for reference: - Server images: prefecthq/server tags, prefecthq/apollo tags, prefecthq/ui tags - Hasura tags: hasura/graphql-engine tags
m
@Marvin so the prefecthq/prefect should stay prefecthq/prefect:1.4.1, while the rest of the images should change to 2022.09.23?
m
thought for 23.8 seconds
Yes. Keep your agents/flow runtime on prefecthq/prefect:1.4.1, and set all Prefect Server components to the same date tag (e.g., 2022.09.23). Do not mix core-1.4.1 tags for server components. Use this set for the 1.4.1 era:
Copy code
# Server stack (all must match)
FROM prefecthq/server:2022.09.23      # GraphQL service
FROM prefecthq/apollo:2022.09.23
FROM prefecthq/ui:2022.09.23
FROM hasura/graphql-engine:v1.3.3.cli-migrations-v2

# Agents / flow runtime
FROM prefecthq/prefect:1.4.1
Then: - Ensure PREFECT__BACKEND=server and the endpoint points to your GraphQL URL (or pass Client(api_server=...)). No tokens on self-hosted. - Redeploy the server stack (all pods on 2022.09.23). - Restart agents with Prefect 1.4.1. - Re-register flows, pinning Git storage to a commit SHA. - Trigger a fresh run. The empty task_id errors should stop. References: - prefecthq/server tags - prefecthq/apollo tags - prefecthq/ui tags - hasura/graphql-engine tags If you want to target a different server date tag, switch all three server components to that tag and keep agents on a matching 1.x client version. Mixing tag schemes is how we get existential GraphQL angst.
m
@Marvin I got #2 ERROR: docker.io/prefecthq/ui:2022.09.23: not found what image should I use?
m
thought for 103.9 seconds
That tag doesn’t exist for the UI image. Prefect v1’s server images are… delightfully inconsistent across repos. Some use date tags, some don’t publish every date. Hence your 404. You have two sane paths. Pick one and make everything match. Option A (fastest): use the “core” tags everywhere for the server stack - Server components (all the same): - prefecthq/server:core-1.4.1 - prefecthq/apollo:core-1.4.1 - prefecthq/ui:core-1.4.1 - hasura/graphql-engine:v2.0.1.cli-migrations-v2 (this pairs with the core-1.4.1 stack) - Agents/flow runtime: - prefecthq/prefect:1.4.1 (or pip install prefect==1.4.1) Values override example:
Copy code
serverVersionTag: "core-1.4.1"
prefectVersionTag: "core-1.4.1"
uiVersionTag: "core-1.4.1"
hasura:
  image:
    tag: "v2.0.1.cli-migrations-v2"
Option B (date-tag scheme): use a single server release tag for all components - This requires using the exact UI tag that actually exists for that release. 2022.09.23 may not be published for ui. - Components: - prefecthq/server:<date> - prefecthq/apollo:<same date> - prefecthq/ui:<matching tag for that release> ← check Docker Hub - hasura/graphql-engine:v1.3.3.cli-migrations-v2 (pairs with older date releases) - Agents/flow runtime: - prefecthq/prefect:1.4.1 Given you hit “not found” for ui:2022.09.23, I recommend Option A: switch your values and Dockerfiles to core-1.4.1 for server, apollo, and ui, and keep agents on prefect 1.4.1. That gives you a consistent, known-good set without playing Docker tag bingo. After you align: - Redeploy the server stack (verify all pods report core-1.4.1). - Restart agents on 1.4.1. - Re-register flows (pin Git storage to a commit SHA). - Trigger a fresh run. If you really want the date-tag set, verify available tags here before pinning: - UI tags: prefecthq/ui tags - Server tags: prefecthq/server tags - Apollo tags: prefecthq/apollo tags - Hasura tags: hasura/graphql-engine tags Consistency is the cure; the exact scheme is less important than using the same one everywhere.
m
@Marvin changin the hasura down causes this on the deployment : {"path":"$","error":"Cannot use database previously used with a newer version of graphql-engine (expected a catalog version <=40, but the current version is 46).","code":"not-supported"} Will the version tag: "v2.0.1.cli-migrations-v2" really cause problems?
m
thought for 43.5 seconds
Short answer: keep Hasura at v2.0.1.cli-migrations-v2. Downgrading it is what broke your deployment. Why: - Hasura stores a “catalog” in Postgres. Your DB is at catalog version 46 (from Hasura v2.x). Older Hasura (e.g., v1.3.3) expects <= 40 and will refuse to start. Hasura catalog downgrades aren’t supported unless you reset the DB or perform manual downgrade migrations (not fun). So, a consistent, working set for your setup: - Server stack (all must match): - prefecthq/server:core-1.4.1 - prefecthq/apollo:core-1.4.1 - prefecthq/ui:core-1.4.1 - hasura/graphql-engine:v2.0.1.cli-migrations-v2 ← keep this to match your existing catalog - Agents / flow runtime: - prefecthq/prefect:1.4.1 Example values.yaml snippet:
Copy code
serverVersionTag: "core-1.4.1"
prefectVersionTag: "core-1.4.1"
uiVersionTag: "core-1.4.1"

hasura:
  image:
    repository: hasura/graphql-engine
    tag: "v2.0.1.cli-migrations-v2"
Then: - Helm upgrade the stack so all Prefect server pods use core-1.4.1. - Restart agents on prefect==1.4.1. - Re-register flows (pin Git storage to a commit SHA). - Trigger fresh runs. Only switch Hasura versions if you also reset/migrate the Postgres catalog accordingly. Otherwise, keep v2.0.1 and everyone will get along fine. Useful tags: - UI/server/apollo tags: prefecthq/ui, prefecthq/server, prefecthq/apollo - Hasura tags: hasura/graphql-engine
m
@Marvin if I purge the Postgres database will it work?
m
thought for 62.4 seconds
Short answer: yes, purging the Postgres database will “work” in the sense that an older Hasura can start and create its own (older) catalog. It will also nuke all your Prefect data. If that’s your idea of a good time, I won’t stop you—but you don’t need to do it to fix your current issue. Recommended path (no DB purge): - Keep Hasura at v2.0.1.cli-migrations-v2 (your DB is already at catalog 46 from Hasura v2). - Align all Prefect server components to the same “core” tag: - prefecthq/server:core-1.4.1 - prefecthq/apollo:core-1.4.1 - prefecthq/ui:core-1.4.1 - Keep agents/flow runtime on prefecthq/prefect:1.4.1 (or pip install prefect==1.4.1). - Redeploy, then re-register flows (pin Git storage to a commit SHA) and trigger fresh runs. Only purge the DB if you intentionally want to downgrade Hasura to v1.3.3: - What happens if you purge: - Stop the stack. - Drop the Postgres database (or point to a brand new empty DB). - Start the stack with your desired versions (e.g., server/apollo/ui date-tag set + hasura v1.3.3.cli-migrations-v2). - Hasura will initialize its older catalog; Prefect will apply its migrations. - You lose all flows, runs, states, projects, schedules, etc. You must re-create and re-register everything. Bottom line: - To fix your current mismatch and the “empty task_id” saga, don’t purge. Use core-1.4.1 for all Prefect server components and keep Hasura at v2.0.1.cli-migrations-v2. Then re-register flows. That’s the least-destructive path. Tag references, if you want to double-check availability: - UI: prefecthq/ui - Server (GraphQL): prefecthq/server - Apollo: prefecthq/apollo - Hasura: hasura/graphql-engine
m
@Marvin the problem is I cannot change the server stack to core-1.4.1, because in chart yaml I am not using dependencies. I had to copy all templates and they are at version 2022.09.23. Only thing that is movable are the doscker images of individual components
m
thought for 81.4 seconds
You’re not blocked. The Helm chart “version” in your copied templates doesn’t matter; the running behavior is defined by the container images you set. If you can change image tags, you can align the stack. Do this: - Prefect Server components (all the same tag): - prefecthq/server:core-1.4.1 - prefecthq/apollo:core-1.4.1 - prefecthq/ui:core-1.4.1 - Hasura: keep the catalog-compatible version you already use - hasura/graphql-engine:v2.0.1.cli-migrations-v2 - Agents/flow runtime: - prefecthq/prefect:1.4.1 (or pip install prefect==1.4.1) Update your manifests/values to set those tags, redeploy, and ensure every Prefect server pod shows core-1.4.1 and Hasura shows v2.0.1.cli-migrations-v2. The fact your YAML templates came from a “2022.09.23” chart is fine; just don’t hardcode those image tags. Example snippet you can adapt:
Copy code
server:
  image:
    repository: prefecthq/server
    tag: core-1.4.1

apollo:
  image:
    repository: prefecthq/apollo
    tag: core-1.4.1

ui:
  image:
    repository: prefecthq/ui
    tag: core-1.4.1

hasura:
  image:
    repository: hasura/graphql-engine
    tag: v2.0.1.cli-migrations-v2
Then: - Restart agents on prefect==1.4.1. - Re-register flows, pinning Git storage to a commit SHA. - Trigger fresh runs. If you hit “image not found,” double-check the tags exist: - UI: prefecthq/ui tags - Server: prefecthq/server tags - Apollo: prefecthq/apollo tags - Hasura: hasura/graphql-engine tags Purging Postgres just to downgrade Hasura is unnecessary and destructive. Keeping Hasura at v2.0.1.cli-migrations-v2 avoids the catalog downgrade error. The key is consistency: all Prefect server components on the same tag (core-1.4.1) and your Python clients on 1.4.1.
m
@Marvin I serilized the verison but still getting: Failed to retrieve task state with error: ClientError([{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}]) Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 157, in initialize_run map_index=context.get("map_index"), File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1498, in get_task_run_info result = self.graphql(mutation) # type: Any File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 465, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}] What to do?
m
thought for 84.7 seconds
You’ve aligned versions; good. An empty task_id at get_or_create_task_run_info still means the server can’t match the task you’re trying to run to a task that was registered for that flow. Do these in order: 1) Prove the flow and tasks match at the server - Re-register the flow using the exact code your agent will run. If you use Git storage, pin to a commit SHA (not a branch) and confirm the agent logs show that SHA. - Delete the existing flow in the UI first (or change the flow name) to force a clean task set. 2) Make the DAG deterministic at import time - Define all tasks at module import, not inside the flow function or under conditionals. - If you reuse the same task object multiple times, copy it with stable names/slugs:
Copy code
step1 = shared_task.copy(name="shared_step1", slug="shared_step1")
  step2 = shared_task.copy(name="shared_step2", slug="shared_step2")
- Give every task an explicit name and slug. Don’t rely on auto-generated ones. 3) Sanity-check what’s registered vs what you run - On the exact runtime image, load the flow and print slugs:
Copy code
from my_pkg.flow import flow
  print(sorted(t.slug for t in flow.tasks))
- In the UI, open the flow and confirm those slugs exist under Tasks. If a slug is missing in the UI, that task wasn’t part of the registered graph. 4) Ensure you’re actually running via Server + agent - Do not call
flow.run()
. Start runs from the UI/API and let the agent pick them up. - In a tiny debug task at the top of your flow, log context to confirm identity is present:
Copy code
import prefect
  from prefect import task
  @task(name="debug_ctx", slug="debug_ctx")
  def debug_ctx():
      ctx = prefect.context
      print("flow_run_id:", ctx.get("flow_run_id"))
      print("task_slug:", ctx.get("task_slug"))
If flow_run_id is None, you’re not in a proper run. 5) Keep versions truly aligned - Server components: prefecthq/server, apollo, ui: core-1.4.1 - Hasura: v2.0.1.cli-migrations-v2 (don’t downgrade; your catalog is v46) - Agents/flows: prefect==1.4.1 If it still fails, the task IDs were never attached during registration. Share: - Your storage config (Git/Docker; include ref/commit), - The exact registration code, - How you import/wire tasks into the flow, - A snippet of agent logs where it loads the flow (repo/ref/path). I’ll pinpoint where the IDs fall off. The usual fix is “clean re-register + pinned code + explicit slugs.” Thrilling, I know.
m
@Marvin I cannot see tasks in the UI at all. How do I make sure they are registered?
m
thought for 73.6 seconds
If you can’t see any tasks in the UI, your flow was registered without a task graph. That’s why the server later gets
task_id: ""
. What to verify and fix: 1) Build the DAG at import time (not at run time) - In Prefect v1, tasks must be bound to a Flow when you create the Flow, not inside some later function call. - Good pattern:
Copy code
from prefect import task, Flow

@task(name="extract", slug="extract")
def extract(): ...

@task(name="transform", slug="transform")
def transform(x): ...

with Flow("my-flow") as flow:
    t1 = extract()
    t2 = transform(t1)
- If your tasks live in modules, you still need to call them (or add them) inside the
with Flow(...)
block:
Copy code
from my_tasks import extract, transform, load
with Flow("etl") as flow:
    data = extract()
    clean = transform(data)
    load(clean)
- Alternatively, explicitly wire them:
Copy code
flow.add_task(extract)
flow.add_task(transform)
flow.set_dependencies(task=transform, upstream_tasks=[extract])
2) Sanity check before registering
Copy code
print("tasks:", [t.slug for t in flow.tasks])
print("task count:", len(flow.tasks))
If this prints 0, the UI will also show 0. Fix your wiring. 3) Register against Server and a real project - Make sure you’re on the Server backend:
Copy code
prefect backend server
- Ensure the project exists:
Copy code
prefect create project "my-project"
- Register from the exact code the agent will run (pin Git storage to a commit SHA):
Copy code
flow.register(project_name="my-project")
- Consider deleting the old flow in the UI (or change the flow name) to force a clean registration. 4) Give tasks stable identities - If you reuse the same task object multiple times, copy it with unique name/slug:
Copy code
from my_lib.tasks import shared_task
a = shared_task.copy(name="step_a", slug="step_a")
b = shared_task.copy(name="step_b", slug="step_b")
- Avoid creating new Task objects conditionally at run time. The set of tasks must exist at registration. 5) Keep versions aligned (you’ve done this, but for completeness) - Server stack: prefecthq/server, apollo, ui = core-1.4.1 - Hasura: v2.0.1.cli-migrations-v2 (to match your catalog) - Agents/flows: prefect==1.4.1 Quick diagnostic if it still misbehaves: - On the same runtime image the agent uses, import your flow and print slugs:
Copy code
from my_pkg.flow import flow
print(sorted(t.slug for t in flow.tasks))
- In the UI, open the flow and confirm those slugs appear under Tasks. If not, you registered a different graph than you’re running. Docs for v1 (archived but useful): - Flows and registration: https://docs.prefect.io/core/concepts/flows.html - Tasks: https://docs.prefect.io/core/concepts/tasks.html - Storage (Git/GitHub): https://docs.prefect.io/api/latest/storage/#git Do the “task count” print before registering. If it’s zero, the fix is in your flow wiring, not the server. Once the UI shows tasks, the “empty task_id” error should disappear.