Andy Dienes
05/12/2023, 2:48 PMretries=2
. the worker logs include this. has anyone seen it before?
│ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ │
│ File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread │
│ return await anyio.to_thread.run_sync( │
│ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ │
│ File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync │
│ return await get_asynclib().run_sync_in_worker_thread( │
│ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ │
│ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread │
│ return await future │
│ ^^^^^^^^^^^^ │
│ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run │
│ result = context.run(func, *args) │
│ ^^^^^^^^^^^^^^^^^^^^^^^^ │
│ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 703, in _watch_job │
│ for event in watch.stream( │
│ File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream │
│ for line in iter_resp_lines(resp): │
│ File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines │
│ for seg in resp.stream(amt=None, decode_content=False): │
│ File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream │
│ for line in self.read_chunked(amt, decode_content=decode_content): │
│ File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked │
│ with self._error_catcher(): │
│ File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__ │
│ self.gen.throw(typ, value, traceback) │
│ File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher │
│ raise ProtocolError("Connection broken: %r" % e, e) │
│ urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
Jeff Hale
05/12/2023, 3:27 PMprefect version
?Andy Dienes
05/12/2023, 3:27 PMassert False
flow that will guarantee failJeff Hale
05/12/2023, 3:37 PMprefect version
from the CLI? Should be like 10 lines of output.Andy Dienes
05/12/2023, 3:46 PM2.10.6
, lost track of versions
I have no name!@prefect-server-88bc95567-4vhw9:~$ prefect version
15:42:46.035 | DEBUG | prefect.profiles - Using profile 'default'
15:42:46.466 | DEBUG | prefect._internal.concurrency.calls - <function version at 0x7f9c03bef250> --> run async in new loop
15:42:46.467 | DEBUG | prefect._internal.concurrency.calls - Running call version() in thread 'MainThread' with cancel context <CancelContext at 0x7f9c03413d90>
15:42:46.467 | DEBUG | prefect._internal.concurrency.timeouts - Entered synchronous alarm based cancel context <CancelContext at 0x7f9c03413c70>
15:42:46.468 | DEBUG | prefect._internal.concurrency.timeouts - Marked <CancelContext at 0x7f9c03413c70> as completed
15:42:46.468 | DEBUG | prefect._internal.concurrency.calls - Executing coroutine for call version() in new loop
15:42:46.469 | DEBUG | prefect._internal.concurrency.calls - version() using async cancel scope <CancelContext at 0x7f9c03413eb0 timeout=inf>
15:42:47.199 | DEBUG | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://prefect:test@prefect-server-postgresql.prefect:5432/server>
Version: 2.10.6
API version: 0.8.4
Python version: 3.10.11
Git commit: d7a8f6bf
Built: Thu, Apr 27, 2023 2:23 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: postgresql
15:42:47.202 | DEBUG | prefect._internal.concurrency.timeouts - Marked <CancelContext at 0x7f9c03413eb0 timeout=inf> as completed
15:42:47.202 | DEBUG | prefect._internal.concurrency.timeouts - Marked <CancelContext at 0x7f9c03413d90> as completed
15:42:47.202 | DEBUG | prefect._internal.concurrency.calls - Finished async call version()
Jeff Hale
05/12/2023, 3:53 PMAndy Dienes
05/12/2023, 3:53 PMZanie
05/12/2023, 7:18 PM# usage: python <file>.py <FLOW_RUN_ID>
from prefect import get_client
async def main(flow_run_id):
async with get_client() as client:
states = await client.read_flow_run_states(flow_run_id)
for state in states:
print(state.timestamp, state.type.name, state.name)
import asyncio
import sys
asyncio.run(main(sys.argv[1]))
run_deployment
or a schedule?Andy Dienes
05/12/2023, 7:49 PM~/.../research-processes/flows/ (main *) $ python3 src/states.py 6981884f-d3c7-4eb1-8dd1-bb938c47d4cc
2023-05-12T14:17:10.797035+00:00 SCHEDULED Scheduled
2023-05-12T14:17:17.075959+00:00 PENDING Pending
2023-05-12T14:17:54.402251+00:00 RUNNING Running
2023-05-12T14:17:54.480732+00:00 SCHEDULED AwaitingRetry
2023-05-12T14:17:54.508268+00:00 RUNNING Running
2023-05-12T14:17:54.568193+00:00 FAILED Failed
2023-05-12T14:18:37.167279+00:00 RUNNING Running
2023-05-12T14:18:37.230867+00:00 FAILED Failed
2023-05-12T14:19:29.594897+00:00 RUNNING Running
2023-05-12T14:19:29.655453+00:00 FAILED Failed
2023-05-12T14:20:32.642992+00:00 RUNNING Running
2023-05-12T14:20:32.703913+00:00 FAILED Failed
2023-05-12T14:21:57.416785+00:00 RUNNING Running
2023-05-12T14:21:57.462251+00:00 FAILED Failed
2023-05-12T14:22:40.189321+00:00 RUNNING Running
2023-05-12T14:22:40.231155+00:00 FAILED Failed
2023-05-12T14:24:42.191423+00:00 RUNNING Running
2023-05-12T14:24:42.229487+00:00 FAILED Failed
2023-05-12T14:28:03.415072+00:00 RUNNING Running
2023-05-12T14:28:03.456862+00:00 FAILED Failed
2023-05-12T14:33:45.681419+00:00 FAILED Failed
2023-05-12T14:37:43.813331+00:00 FAILED Failed
2023-05-12T14:44:05.301809+00:00 FAILED Failed
2023-05-12T14:50:25.558292+00:00 FAILED Failed
2023-05-12T14:56:46.971012+00:00 FAILED Failed
2023-05-12T15:03:08.962017+00:00 FAILED Failed
2023-05-12T15:09:30.292505+00:00 FAILED Failed
2023-05-12T15:15:51.224763+00:00 FAILED Failed
from prefect import flow
from utils.prefect_utils import notify_slack, when_flow
from prefect import get_run_logger
# prefect deploy src/flow_fails.py:fails -n "fails_test" -p mtspool -q default
@flow(on_failure=[notify_slack], retries=1)
def fails():
assert False
where notify_slack
is
def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
slack_webhook_block = SlackWebhook.load("alerts-prefect")
slack_webhook_block.notify(
f"Your job {flow_run.name} entered {state.name} with message:\n\n>{state.message}\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Approximate start time = {flow_run.expected_start_time}\n"
)
Zanie
05/12/2023, 8:04 PMAndy Dienes
05/12/2023, 8:13 PM04.10
Zanie
05/12/2023, 8:21 PMAndy Dienes
05/12/2023, 8:23 PMZanie
05/12/2023, 8:24 PMjawnsy
05/12/2023, 8:38 PMkubectl describe pod --selec
tor=<http://app.kubernetes.io/component=worker|app.kubernetes.io/component=worker>
Scott Cressi
05/12/2023, 8:46 PM