hey, I attempted to add an on-fail slack alert wi...
# prefect-cloud
hey, I attempted to add an on-fail slack alert with a decorator like so:
Copy code
def Flow(*flow_args, **flow_kwargs):
    # basically the same as @prefect.flow but adds Slack webhook
    def decorator(func):
        def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
            except Exception as e:
                slack_webhook_block = SlackWebhook.load("alerts-prefect")

                msg = f"Your job {func.__name__} failed with exception {e}.\n"
                msg += f"See the flow run at: <https://prefect.myinstance.net/flow-runs/flow-run/{prt.flow_run.id}>\n"
                msg += f"Tags: {prt.flow_run.tags}\n"
                msg += f"Scheduled start time = {prt.flow_run.scheduled_start_time}"

                raise e

        return flow(*flow_args, **flow_kwargs)(wrapper)
    return decorator
it works, but it pings for each retry of the flow. any tips to make my flow ping only once the whole thing fails, not just a retry?
āœ… 1
hi @Andy Dienes - seems like a great use case for the
Copy code
from prefect import flow
from prefect.blocks.core import Block
from prefect.server.schemas.core import Flow, FlowRun
from prefect.server.schemas.states import State
from prefect.settings import PREFECT_API_URL

def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
    slack_webhook_block = Block.load("slack-webhook/test")
        f"Your job {flow_run.name} entered {state.name} with message:\n\n>{state.message}\n\n"
        f"See <https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}|the flow run in the UI>\n\n"
        f"Tags: {flow_run.tags}\n\n"
        f"Scheduled start time = {flow_run.expected_start_time}\n"

@flow(on_failure=[notify_slack], retries=1)
def noisy_flow():
    raise ValueError("oops!")

if __name__ == "__main__":
šŸ™Œ 1
Copy code
10:18:10.817 | INFO    | Flow run 'radical-oarfish' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
the flow will enter
and then try again before entering
, so it won't run the hook until it enters
at the end
looks great, thanks! I'll try it
hey Nate, I tried this, but it doesn't actually seem to work? as in I know my slack webhook block works since my decorator pings it, but with this formulation the
does not ping slack. is there a certain version I need to be on? I think I'm currently on 2.10.4
i believe i was on 2.10.7 when i put together and tested the example i shared above, but i dont think anything changed between 2.10.4 and now - just tried again on 2.10.4 and it works for me
can you share your trace? maybe your webhook block is not loading correctly from the hook?
yeah, I see the issue now. I changed your
, and it doesn't have that field I suppose. this is a little confusing to me, since I had previously been using
without issue.
not return a
hmm let me check on that
no, i dont believe it does. it looks like it only surfaces select flow run attrs from the flow run context
I see. the reason I prefer it is that I have these jobs set on a schedule, e.g.
0 9 * * *
, and I want to see the timestamp it was supposed to run be logged. if I use
then it gives me whatever random timestamp at when the worker actually started, which is usually a few millis off of the correct timestamp, but in the worst case if my pool is unhealthy and the job is late then it can be many hours off of the logical time I wanted
also, even with fixing that, it still notifies for each failure. I really want it to do all the retries first and then notify at the end only once! this is my decorator call
@flow(on_failure=[notify_slack], retries=1)
in fact, it pings me 9x šŸ˜…
the worker logs include this
Copy code
ā”‚                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                         ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread                                                                    ā”‚
ā”‚     return await anyio.to_thread.run_sync(                                                                                                                                                 ā”‚
ā”‚            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                 ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync                                                                                                  ā”‚
ā”‚     return await get_asynclib().run_sync_in_worker_thread(                                                                                                                                 ā”‚
ā”‚            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                 ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread                                                                       ā”‚
ā”‚     return await future                                                                                                                                                                    ā”‚
ā”‚            ^^^^^^^^^^^^                                                                                                                                                                    ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run                                                                                             ā”‚
ā”‚     result = context.run(func, *args)                                                                                                                                                      ā”‚
ā”‚              ^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                      ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 703, in _watch_job                                                                                     ā”‚
ā”‚     for event in watch.stream(                                                                                                                                                             ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream                                                                                            ā”‚
ā”‚     for line in iter_resp_lines(resp):                                                                                                                                                     ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines                                                                                    ā”‚
ā”‚     for seg in resp.stream(amt=None, decode_content=False):                                                                                                                                ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream                                                                                                  ā”‚
ā”‚     for line in self.read_chunked(amt, decode_content=decode_content):                                                                                                                     ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked                                                                                            ā”‚
ā”‚     with self._error_catcher():                                                                                                                                                            ā”‚
ā”‚   File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__                                                                                                                    ā”‚
ā”‚     self.gen.throw(typ, value, traceback)                                                                                                                                                  ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher                                                                                          ā”‚
ā”‚     raise ProtocolError("Connection broken: %r" % e, e)                                                                                                                                    ā”‚
ā”‚ urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
maybe that is causing more retries than intended?
hmm - i will try to reproduce this!