https://prefect.io logo
Title
a

Andy Dienes

05/12/2023, 2:48 PM
my flows are retrying dozens of times even though I only set
retries=2
. the worker logs include this. has anyone seen it before?
│                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                         │
│   File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread                                                                    │
│     return await anyio.to_thread.run_sync(                                                                                                                                                 │
│            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                 │
│   File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync                                                                                                  │
│     return await get_asynclib().run_sync_in_worker_thread(                                                                                                                                 │
│            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                 │
│   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread                                                                       │
│     return await future                                                                                                                                                                    │
│            ^^^^^^^^^^^^                                                                                                                                                                    │
│   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run                                                                                             │
│     result = context.run(func, *args)                                                                                                                                                      │
│              ^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                      │
│   File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 703, in _watch_job                                                                                     │
│     for event in watch.stream(                                                                                                                                                             │
│   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream                                                                                            │
│     for line in iter_resp_lines(resp):                                                                                                                                                     │
│   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines                                                                                    │
│     for seg in resp.stream(amt=None, decode_content=False):                                                                                                                                │
│   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream                                                                                                  │
│     for line in self.read_chunked(amt, decode_content=decode_content):                                                                                                                     │
│   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked                                                                                            │
│     with self._error_catcher():                                                                                                                                                            │
│   File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__                                                                                                                    │
│     self.gen.throw(typ, value, traceback)                                                                                                                                                  │
│   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher                                                                                          │
│     raise ProtocolError("Connection broken: %r" % e, e)                                                                                                                                    │
│ urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
j

Jeff Hale

05/12/2023, 3:27 PM
What’s the result of
prefect version
?
a

Andy Dienes

05/12/2023, 3:27 PM
2.10.4
in this case the flow is just a test
assert False
flow that will guarantee fail
:thank-you: 1
j

Jeff Hale

05/12/2023, 3:37 PM
Thank you. What’s the result of running
prefect version
from the CLI? Should be like 10 lines of output.
a

Andy Dienes

05/12/2023, 3:46 PM
my bad, it is
2.10.6
, lost track of versions
I have no name!@prefect-server-88bc95567-4vhw9:~$ prefect version
15:42:46.035 | DEBUG   | prefect.profiles - Using profile 'default'
15:42:46.466 | DEBUG   | prefect._internal.concurrency.calls - <function version at 0x7f9c03bef250> --> run async in new loop
15:42:46.467 | DEBUG   | prefect._internal.concurrency.calls - Running call version() in thread 'MainThread' with cancel context <CancelContext at 0x7f9c03413d90>
15:42:46.467 | DEBUG   | prefect._internal.concurrency.timeouts - Entered synchronous alarm based cancel context <CancelContext at 0x7f9c03413c70>
15:42:46.468 | DEBUG   | prefect._internal.concurrency.timeouts - Marked <CancelContext at 0x7f9c03413c70> as completed
15:42:46.468 | DEBUG   | prefect._internal.concurrency.calls - Executing coroutine for call version() in new loop
15:42:46.469 | DEBUG   | prefect._internal.concurrency.calls - version() using async cancel scope <CancelContext at 0x7f9c03413eb0 timeout=inf>
15:42:47.199 | DEBUG   | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://prefect:test@prefect-server-postgresql.prefect:5432/server>
Version:             2.10.6
API version:         0.8.4
Python version:      3.10.11
Git commit:          d7a8f6bf
Built:               Thu, Apr 27, 2023 2:23 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          postgresql
15:42:47.202 | DEBUG   | prefect._internal.concurrency.timeouts - Marked <CancelContext at 0x7f9c03413eb0 timeout=inf> as completed
15:42:47.202 | DEBUG   | prefect._internal.concurrency.timeouts - Marked <CancelContext at 0x7f9c03413d90> as completed
15:42:47.202 | DEBUG   | prefect._internal.concurrency.calls - Finished async call version()
:thank-you: 1
j

Jeff Hale

05/12/2023, 3:53 PM
Thank you! What kind of worker are you using?
a

Andy Dienes

05/12/2023, 3:53 PM
kube worker
not sure if that's sufficient; lmk if there's any other detail I can give
z

Zanie

05/12/2023, 7:18 PM
Can you get us the state transitions for the run?
# usage: python <file>.py <FLOW_RUN_ID>
from prefect import get_client

async def main(flow_run_id):
    async with get_client() as client:
        states = await client.read_flow_run_states(flow_run_id)
        for state in states:
            print(state.timestamp, state.type.name, state.name)


import asyncio
import sys

asyncio.run(main(sys.argv[1]))
Additionally, • Is it a subflow run? • Was the run trigged by
run_deployment
or a schedule?
a

Andy Dienes

05/12/2023, 7:49 PM
yes, sorry for the delay
~/.../research-processes/flows/ (main *) $ python3 src/states.py 6981884f-d3c7-4eb1-8dd1-bb938c47d4cc
2023-05-12T14:17:10.797035+00:00 SCHEDULED Scheduled
2023-05-12T14:17:17.075959+00:00 PENDING Pending
2023-05-12T14:17:54.402251+00:00 RUNNING Running
2023-05-12T14:17:54.480732+00:00 SCHEDULED AwaitingRetry
2023-05-12T14:17:54.508268+00:00 RUNNING Running
2023-05-12T14:17:54.568193+00:00 FAILED Failed
2023-05-12T14:18:37.167279+00:00 RUNNING Running
2023-05-12T14:18:37.230867+00:00 FAILED Failed
2023-05-12T14:19:29.594897+00:00 RUNNING Running
2023-05-12T14:19:29.655453+00:00 FAILED Failed
2023-05-12T14:20:32.642992+00:00 RUNNING Running
2023-05-12T14:20:32.703913+00:00 FAILED Failed
2023-05-12T14:21:57.416785+00:00 RUNNING Running
2023-05-12T14:21:57.462251+00:00 FAILED Failed
2023-05-12T14:22:40.189321+00:00 RUNNING Running
2023-05-12T14:22:40.231155+00:00 FAILED Failed
2023-05-12T14:24:42.191423+00:00 RUNNING Running
2023-05-12T14:24:42.229487+00:00 FAILED Failed
2023-05-12T14:28:03.415072+00:00 RUNNING Running
2023-05-12T14:28:03.456862+00:00 FAILED Failed
2023-05-12T14:33:45.681419+00:00 FAILED Failed
2023-05-12T14:37:43.813331+00:00 FAILED Failed
2023-05-12T14:44:05.301809+00:00 FAILED Failed
2023-05-12T14:50:25.558292+00:00 FAILED Failed
2023-05-12T14:56:46.971012+00:00 FAILED Failed
2023-05-12T15:03:08.962017+00:00 FAILED Failed
2023-05-12T15:09:30.292505+00:00 FAILED Failed
2023-05-12T15:15:51.224763+00:00 FAILED Failed
it is not a subflow run. I just have
from prefect import flow
from utils.prefect_utils import notify_slack, when_flow
from prefect import get_run_logger

# prefect deploy src/flow_fails.py:fails -n "fails_test" -p mtspool -q default
@flow(on_failure=[notify_slack], retries=1)
def fails():
    assert False
where
notify_slack
is
def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
    slack_webhook_block = SlackWebhook.load("alerts-prefect")
            
    slack_webhook_block.notify(
        f"Your job {flow_run.name} entered {state.name} with message:\n\n>{state.message}\n\n"
        f"See <https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}|the flow run in the UI>\n\n"
        f"Tags: {flow_run.tags}\n\n"
        f"Approximate start time = {flow_run.expected_start_time}\n"
    )
the retries occur whether it is scheduled (I have these deployed with a cron schedule) or a quick run from the ui
z

Zanie

05/12/2023, 8:04 PM
Thanks and this is running via a Kubernetes worker defined by our Helm chart?
Can you share more worker logs? Is it submitting the flow run once or are multiple jobs and pods created for the run?
a

Andy Dienes

05/12/2023, 8:13 PM
yes, kubernetes worker using the chart here https://github.com/PrefectHQ/prefect-helm/commits/main/charts/prefect-worker. I think the version from
04.10
I can get more logs, give me a few min 🙂 . I believe it is only one job/pod that is crashing / restarting many times
z

Zanie

05/12/2023, 8:21 PM
Ah why is the pod restarting?
a

Andy Dienes

05/12/2023, 8:23 PM
no idea, I was hoping someone here would know
z

Zanie

05/12/2023, 8:24 PM
Can you get Kubernetes events information for the job / pod?
j

jawnsy

05/12/2023, 8:38 PM
Can you share the output of the following command in a DM with Zanie and I?
kubectl describe pod --selec
tor=<http://app.kubernetes.io/component=worker|app.kubernetes.io/component=worker>
1
s

Scott Cressi

05/12/2023, 8:46 PM
Untitled