Blake Stefansen
06/27/2023, 4:03 PMpod_watch_timeout_seconds
https://docs.prefect.io/2.10.17/api-ref/prefect/infrastructure/?h=pod+watch+timeout#prefect.infrastructure.KubernetesJob
The attribute is described as Number of seconds to watch for pod creation before timing out (default 60).
My team has a concurrency limit on our queue of 10 jobs, and most of these jobs take around 30 seconds. Notice in the image below how job number 11 is flagged late
, which eventually causes the agent to flag as crashed
. However, the crashed
job will eventually start running and become complete
, even though the agent stops logging the job.
15:25:20.062 | ERROR | prefect.infrastructure.kubernetes-job - Job 'file5-sf-fx-locations-foobar-maxdown-csv-rs4bz': Pod never started.
15:25:20.213 | INFO | prefect.agent - Reported flow run '18f20756-0731-4f2a-8395-61e9ab755dfd' as crashed: Flow run infrastructure exited with non-zero status code -1.
QUESTIONS
1. What triggers the timer countdown? Like, does the 60 second timer start counting down once the job leaves the queue and is picked up by the agent?
2. What happens in a scenario where there are 1000 jobs added to the queue? Will I get a bunch of crashes? ( i'm assuming not because the agent wouldn't pick up more than 10 jobs due to the concurrency limit )
3. If "job 11" is picked up by the agent, that means it took the place of the previous completed job, so I would think that the pod would get created almost immediately ( at least within 60 sec ). I guess i'm not sure why the job pod is not getting created within 60 seconds if the agent is picking it up.chara
06/30/2023, 12:45 PMJohn Mizerany
06/30/2023, 3:08 PMDeceivious
07/05/2023, 2:31 PMJohn Horn
07/05/2023, 9:55 PM21:16:34.430 | ERROR | Flow run 'tricky-alligator' - Finished in state Failed('Flow run encountered an exception. MissingResult: The result was not persisted and is no longer available.\n')
My guess is that new pod or new start causes all persisted data to go away.
Is there something I should be doing to address this? Thanks k8 gangJose Ignacio Gascon Conde
07/06/2023, 10:13 AMnicholasnet
07/09/2023, 11:34 PMdef create_kubernetes_job():
block = KubernetesJob(
image="<http://xxx.amazonaws.com/prefect-multiple-job-test:dev-xxx|xxx.amazonaws.com/prefect-multiple-job-test:dev-xxx>",
namespace="dev-bidw",
image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
finished_job_ttl=120,
job_watch_timeout_seconds=6000,
pod_watch_timeout_seconds=6000,
env={"ENVIRONMENT": "dev", "ENV": "dev"},
job=KubernetesJob.job_from_file("templates/common.yml")
)
block.save("prefect-multiple-job-test-dev", overwrite=True)
if __name__ == "__main__":
create_kubernetes_job()
Traceback (most recent call last):
File "/var/app/templates/kubernetes_block.py", line 1, in <module>
from prefect.infrastructure.kubernetes import KubernetesJob, KubernetesImagePullPolicy
File "/usr/local/lib/python3.11/site-packages/prefect/__init__.py", line 37, in <module>
from prefect.states import State
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 14, in <module>
from prefect.client.schemas import State as State
File "/usr/local/lib/python3.11/site-packages/prefect/client/schemas.py", line 6, in <module>
from prefect.server import schemas
File "/usr/local/lib/python3.11/site-packages/prefect/server/__init__.py", line 1, in <module>
from . import models
File "/usr/local/lib/python3.11/site-packages/prefect/server/models/__init__.py", line 1, in <module>
from . import (
File "/usr/local/lib/python3.11/site-packages/prefect/server/models/block_documents.py", line 13, in <module>
from prefect.server import schemas
File "/usr/local/lib/python3.11/site-packages/prefect/server/schemas/__init__.py", line 1, in <module>
from . import states, schedules, core, sorting, filters, responses, actions
File "/usr/local/lib/python3.11/site-packages/prefect/server/schemas/states.py", line 13, in <module>
from prefect.server.utilities.schemas import DateTimeTZ, IDBaseModel, PrefectBaseModel
File "/usr/local/lib/python3.11/site-packages/prefect/server/utilities/schemas.py", line 17, in <module>
from pydantic import BaseModel, Field, SecretField
ImportError: cannot import name 'SecretField' from 'pydantic' (/usr/local/lib/python3.11/site-packages/pydantic/__init__.py)
Any reason why I am getting this error.nicholasnet
07/09/2023, 11:49 PMisabel obregon
07/11/2023, 8:18 PMkubernetes-job
, but there is little info on how that infra block is setup or configured. Wondering if anyone has any further information on setting up infrastructure blocks specifically for Kubernetes Jobs; we're confused about how to configure a Kubernetes Job with our existing GCP infra and referencing one of our GCP images using GCP secrets (we've already also read this doc and haven't been able to find further info on that). Thanks in advance!Kyle Hoffman
07/12/2023, 1:08 PMkubernetes-job
block is not adhering to the namespace
or service account name
input fields. Also when deploying both the agent and server into the "prefect" namespace, I get an error when running a flow:
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:prefect:prefect-agent\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
This is even after setting the service account name to some dummy value as well as setting the ns to be "prefect"
The next step is "well if it needs access to the default name space, let's just deploy it into the default namespace", after doing that, I get the exact same issue, except now the error points to the job wanting to be created within the "kube-system" namespace?
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"namespaces \"kube-system\" is forbidden: User \"system:serviceaccount:default:prefect-agent\" cannot get resource \"namespaces\" in API group \"\" in the namespace \"kube-system\"","reason":"Forbidden","details":{"name":"kube-system","kind":"namespaces"},"code":403}
So yeah, i'm pretty lost for what to do, I've even double checked the prefect-agent service account, and it 100% has access to the resources it claims it doesn't.
Versions running:
- name: prefect-agent
version: 2023.07.07
repository: <https://prefecthq.github.io/prefect-helm>
- name: prefect-server
version: 2023.07.07
repository: <https://prefecthq.github.io/prefect-helm>
Zachary Lee
07/12/2023, 2:42 PMnicholasnet
07/12/2023, 4:56 PM403
when I am trying to update the deployment all of a sudden.
Deployment YAML created at '/var/app/load_bullhorn_data-deployment.yaml'.
Deployment storage None does not have upload capabilities; no files uploaded.
Pass --skip-upload to suppress this warning.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
return call()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 383, in __call__
return self.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/cli/deployment.py", line 1210, in build
deployment_id = await deployment.apply()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/deployments.py", line 739, in apply
await client.delete_resource_owned_automations(
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2527, in delete_resource_owned_automations
await self._client.delete(f"/automations/owned-by/{resource_id}")
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1952, in delete
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 280, in send
response.raise_for_status()
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 138, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '403 Forbidden' for url '***/automations/owned-by/prefect.deployment.db3765f1-ed5d-42f6-82ac-217dcbbe63e7'
Response: {'detail': 'Forbidden'}
Even though this user have deployment edit permission.nicholasnet
07/12/2023, 6:20 PM/opt/prefect/flows/venv/lib/python3.11/site-packages/prefect/context.py:505: UserWarning: Failed to create the Prefect home directory at /home/python/.prefect │
│ with SettingsContext(profile=profile, settings=new_settings) as ctx: │
│ Traceback (most recent call last): │
│ │
│ File "/usr/local/lib/python3.11/pathlib.py", line 1108, in touch │
│ fd = os.open(self, flags, mode) │
│ ^^^^^^^^^^^^^^^^^^^^^^^^^^ │
│ FileNotFoundError: [Errno 2] No such file or directory: '/home/python/.prefect/prefect.db' │
│ Stream closed EOF for dev-bidw/orchid-jaguarundi-5vq8g-lbml8 (prefect-job)
This is the yaml
apiVersion: batch/v1
kind: Job
metadata:
# labels are required, even if empty
labels: { }
spec:
template:
spec:
completions: 1
containers: # the first container is required
- env:
- name: PREFECT_LOCAL_STORAGE_PATH
value: /mnt/data
name: prefect-job
envFrom:
- secretRef:
name: "bidw-cloud"
volumeMounts:
- name: bidw-volume
mountPath: /mnt/data
parallelism: 1
restartPolicy: Never
volumes:
- name: bidw-volume
persistentVolumeClaim:
claimName: "px-prefect-pvc"
previous version was this and its working as expected.
apiVersion: batch/v1
kind: Job
metadata:
# labels are required, even if empty
labels: { }
spec:
template:
spec:
completions: 1
containers: # the first container is required
- env:
- name: PREFECT_LOCAL_STORAGE_PATH
value: /mnt/data
name: prefect-job
envFrom:
- secretRef:
name: "bidw-cloud"
parallelism: 1
restartPolicy: Never
Is there anything that I need to add/edit?nicholasnet
07/12/2023, 8:42 PM2.10.19
to 2.10.8
its working now,Ankit
07/13/2023, 1:52 PMAssertionError
Traceback (most recent call last):
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.8/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1670, in cancel_flow_run
raise TerminationSignal(signal=signal.SIGTERM)
prefect.exceptions.TerminationSignal
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1533, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 165, in run_sync_in_interruptible_worker_thread
assert result is not NotSet
AssertionError
Finished in state Failed('Task run encountered an exception: Traceback (most recent call last):\n File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run\n return loop.run_until_complete(main)\n File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete\n self.run_forever()\n File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever\n self._run_once()\n File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once\n event_list = self._selector.select(timeout)\n File "/usr/lib/python3.8/selectors.py", line 468, in select\n fd_event_list = self._selector.poll(timeout, max_ev)\n File "/usr/local/lib/python3.8/dist-packages/prefect/engine.py", line 1670, in cancel_flow_run\n raise TerminationSignal(signal=signal.SIGTERM)\nprefect.exceptions.TerminationSignal\n\nDuring handling of the above exception, another exception occurred:\n\nAssertionError\n')
Aaron Gee-Clough
07/16/2023, 3:27 PMYSF
07/18/2023, 8:50 PMpull:
- prefect.projects.steps.git_clone_project:
repository: myscmprovider/myrepo.git
branch: main
access_token: '{{ prefect.blocks.secret.token }}'
- prefect.deployments.steps.set_working_directory:
id: chdir
directory: /opt/prefect/myrepo/k8-multi-container/
deployments:
- name: deployment-1
entrypoint: .\workflow\flow_demo.py:flow_1
work_pool:
name: aks-worker
job_variables:
image: myimage_1
cpu_request: "4"
cpu_limit: "6"
memory_request: "4Gi"
memory_limit: "6Gi"
- name: deployment-2
entrypoint: .\workflow\another_flow.py:flo_2
work_pool:
name: aks-worker
job_variables:
image: myimage_2
cpu_request: "4"
cpu_limit: "6"
memory_request: "4Gi"
memory_limit: "6Gi"
But I don't understand completely how these two deployments are linked? How does the dependency between flow 1 and flow 2 get established? They don't need to pass data, but flow 2 shouldn't run if flow 1 failed. And how would I kick it off all at once? Right now I do prefect deployments run to run things.Aaron Gee-Clough
07/23/2023, 6:46 PMchara
07/27/2023, 2:43 PMimcom
07/30/2023, 9:20 AMon_cancellation
state change hook? From what I've seen so far, if I try to catch except (asyncio.exceptions.CancelledError, prefect.exceptions.TerminationSignal)
and then return return Cancelled(_message_=f"Cancelled: {_task_id_} is cancelled")
in the flow
logic, I ended up with no cancellation
hook fired at all. I've also tried returning Cancelling
state, still no go. Could anyone shed some lights on the cancellation hook?imcom
07/30/2023, 9:38 AMspot instance
shutdown vs. user initiated cancel
in k8s? Assume I have the ASG with spot instances, when the nodes are being released, I would get SIGTERM
in the running pods tooJohn Horn
07/31/2023, 9:03 PMKiley Roberson
08/02/2023, 7:23 PMHTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:prefect:prefect-worker\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
The kubernetes work pool has the namespace set to prefect
and the service account name set to prefect-worker
. At first I was able to run jobs and it was working but then I had to make edits to the role to allow it to read secrets and then this started happening. Yaml files I used for the Roles are in the thread! Would really appreciate any insight into this - thanks!Christine Chen
08/02/2023, 11:45 PMNoam Banay
08/07/2023, 10:06 AMSlackbot
08/08/2023, 11:45 AMOfir
08/08/2023, 7:08 PMprefect-server
and a prefect-agent
.
What if 90% of the day the prefect-agent
(which is running on a GPU node on the cluster) is idle?
This means it’s underutilized and we waste money for no good reason.
Reference: Airflow provides Kubernetes Executor - on-demand/ad-hoc worker pods.
Since Prefect thought of everything - I’m sure there is either a built-in capability for that or a design pattern for achieving that.
Thanks!Luke Segars
08/10/2023, 6:24 PMOshri
08/11/2023, 6:00 PMMikaël Ferreira de Almeida
08/14/2023, 1:11 PM