https://prefect.io logo
Title
a

Andy Dienes

05/10/2023, 2:13 PM
hey, I attempted to add an on-fail slack alert with a decorator like so:
def Flow(*flow_args, **flow_kwargs):
    # basically the same as @prefect.flow but adds Slack webhook
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                slack_webhook_block = SlackWebhook.load("alerts-prefect")

                msg = f"Your job {func.__name__} failed with exception {e}.\n"
                msg += f"See the flow run at: <https://prefect.myinstance.net/flow-runs/flow-run/{prt.flow_run.id}>\n"
                msg += f"Tags: {prt.flow_run.tags}\n"
                msg += f"Scheduled start time = {prt.flow_run.scheduled_start_time}"

                slack_webhook_block.notify(msg)
                raise e

        return flow(*flow_args, **flow_kwargs)(wrapper)
    return decorator
it works, but it pings for each retry of the flow. any tips to make my flow ping only once the whole thing fails, not just a retry?
āœ… 1
n

Nate

05/10/2023, 3:18 PM
hi @Andy Dienes - seems like a great use case for the
on_failure
hook!
from prefect import flow
from prefect.blocks.core import Block
from prefect.server.schemas.core import Flow, FlowRun
from prefect.server.schemas.states import State
from prefect.settings import PREFECT_API_URL

def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
    slack_webhook_block = Block.load("slack-webhook/test")
            
    slack_webhook_block.notify(
        f"Your job {flow_run.name} entered {state.name} with message:\n\n>{state.message}\n\n"
        f"See <https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}|the flow run in the UI>\n\n"
        f"Tags: {flow_run.tags}\n\n"
        f"Scheduled start time = {flow_run.expected_start_time}\n"
    )

@flow(on_failure=[notify_slack], retries=1)
def noisy_flow():
    raise ValueError("oops!")

if __name__ == "__main__":
    noisy_flow()
šŸ™Œ 1
image.png
10:18:10.817 | INFO    | Flow run 'radical-oarfish' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
the flow will enter
AwaitingRetry
and then try again before entering
Failed
, so it won't run the hook until it enters
Failed
at the end
a

Andy Dienes

05/10/2023, 3:24 PM
looks great, thanks! I'll try it
hey Nate, I tried this, but it doesn't actually seem to work? as in I know my slack webhook block works since my decorator pings it, but with this formulation the
on_failure
does not ping slack. is there a certain version I need to be on? I think I'm currently on 2.10.4
n

Nate

05/12/2023, 1:37 PM
i believe i was on 2.10.7 when i put together and tested the example i shared above, but i dont think anything changed between 2.10.4 and now - just tried again on 2.10.4 and it works for me
can you share your trace? maybe your webhook block is not loading correctly from the hook?
a

Andy Dienes

05/12/2023, 1:38 PM
yeah, I see the issue now. I changed your
expected_start_time
to
flow_run.scheduled_start_time
, and it doesn't have that field I suppose. this is a little confusing to me, since I had previously been using
prefect.runtime.flow_run.scheduled_start_time
without issue.
does
prefect.runtime.flow_run
not return a
FlowRun
type?
n

Nate

05/12/2023, 1:43 PM
hmm let me check on that
no, i dont believe it does. it looks like it only surfaces select flow run attrs from the flow run context
a

Andy Dienes

05/12/2023, 1:54 PM
I see. the reason I prefer it is that I have these jobs set on a schedule, e.g.
0 9 * * *
, and I want to see the timestamp it was supposed to run be logged. if I use
expected_start_time
then it gives me whatever random timestamp at when the worker actually started, which is usually a few millis off of the correct timestamp, but in the worst case if my pool is unhealthy and the job is late then it can be many hours off of the logical time I wanted
also, even with fixing that, it still notifies for each failure. I really want it to do all the retries first and then notify at the end only once! this is my decorator call
@flow(on_failure=[notify_slack], retries=1)
in fact, it pings me 9x šŸ˜…
the worker logs include this
│                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                         │
│   File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread                                                                    │
│     return await anyio.to_thread.run_sync(                                                                                                                                                 │
│            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                 │
│   File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync                                                                                                  │
│     return await get_asynclib().run_sync_in_worker_thread(                                                                                                                                 │
│            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                 │
│   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread                                                                       │
│     return await future                                                                                                                                                                    │
│            ^^^^^^^^^^^^                                                                                                                                                                    │
│   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run                                                                                             │
│     result = context.run(func, *args)                                                                                                                                                      │
│              ^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                      │
│   File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 703, in _watch_job                                                                                     │
│     for event in watch.stream(                                                                                                                                                             │
│   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream                                                                                            │
│     for line in iter_resp_lines(resp):                                                                                                                                                     │
│   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines                                                                                    │
│     for seg in resp.stream(amt=None, decode_content=False):                                                                                                                                │
│   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream                                                                                                  │
│     for line in self.read_chunked(amt, decode_content=decode_content):                                                                                                                     │
│   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked                                                                                            │
│     with self._error_catcher():                                                                                                                                                            │
│   File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__                                                                                                                    │
│     self.gen.throw(typ, value, traceback)                                                                                                                                                  │
│   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher                                                                                          │
│     raise ProtocolError("Connection broken: %r" % e, e)                                                                                                                                    │
│ urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
maybe that is causing more retries than intended?
n

Nate

05/12/2023, 3:48 PM
hmm - i will try to reproduce this!