hey, I attempted to add an on-fail slack alert wi...
# prefect-cloud
a
hey, I attempted to add an on-fail slack alert with a decorator like so:
Copy code
def Flow(*flow_args, **flow_kwargs):
    # basically the same as @prefect.flow but adds Slack webhook
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                slack_webhook_block = SlackWebhook.load("alerts-prefect")

                msg = f"Your job {func.__name__} failed with exception {e}.\n"
                msg += f"See the flow run at: <https://prefect.myinstance.net/flow-runs/flow-run/{prt.flow_run.id}>\n"
                msg += f"Tags: {prt.flow_run.tags}\n"
                msg += f"Scheduled start time = {prt.flow_run.scheduled_start_time}"

                slack_webhook_block.notify(msg)
                raise e

        return flow(*flow_args, **flow_kwargs)(wrapper)
    return decorator
it works, but it pings for each retry of the flow. any tips to make my flow ping only once the whole thing fails, not just a retry?
āœ… 1
n
hi @Andy Dienes - seems like a great use case for the
on_failure
hook!
Copy code
from prefect import flow
from prefect.blocks.core import Block
from prefect.server.schemas.core import Flow, FlowRun
from prefect.server.schemas.states import State
from prefect.settings import PREFECT_API_URL

def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
    slack_webhook_block = Block.load("slack-webhook/test")
            
    slack_webhook_block.notify(
        f"Your job {flow_run.name} entered {state.name} with message:\n\n>{state.message}\n\n"
        f"See <https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}|the flow run in the UI>\n\n"
        f"Tags: {flow_run.tags}\n\n"
        f"Scheduled start time = {flow_run.expected_start_time}\n"
    )

@flow(on_failure=[notify_slack], retries=1)
def noisy_flow():
    raise ValueError("oops!")

if __name__ == "__main__":
    noisy_flow()
šŸ™Œ 1
image.png
Copy code
10:18:10.817 | INFO    | Flow run 'radical-oarfish' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
the flow will enter
AwaitingRetry
and then try again before entering
Failed
, so it won't run the hook until it enters
Failed
at the end
a
looks great, thanks! I'll try it
hey Nate, I tried this, but it doesn't actually seem to work? as in I know my slack webhook block works since my decorator pings it, but with this formulation the
on_failure
does not ping slack. is there a certain version I need to be on? I think I'm currently on 2.10.4
n
i believe i was on 2.10.7 when i put together and tested the example i shared above, but i dont think anything changed between 2.10.4 and now - just tried again on 2.10.4 and it works for me
can you share your trace? maybe your webhook block is not loading correctly from the hook?
a
yeah, I see the issue now. I changed your
expected_start_time
to
flow_run.scheduled_start_time
, and it doesn't have that field I suppose. this is a little confusing to me, since I had previously been using
prefect.runtime.flow_run.scheduled_start_time
without issue.
does
prefect.runtime.flow_run
not return a
FlowRun
type?
n
hmm let me check on that
no, i dont believe it does. it looks like it only surfaces select flow run attrs from the flow run context
a
I see. the reason I prefer it is that I have these jobs set on a schedule, e.g.
0 9 * * *
, and I want to see the timestamp it was supposed to run be logged. if I use
expected_start_time
then it gives me whatever random timestamp at when the worker actually started, which is usually a few millis off of the correct timestamp, but in the worst case if my pool is unhealthy and the job is late then it can be many hours off of the logical time I wanted
also, even with fixing that, it still notifies for each failure. I really want it to do all the retries first and then notify at the end only once! this is my decorator call
@flow(on_failure=[notify_slack], retries=1)
in fact, it pings me 9x šŸ˜…
the worker logs include this
Copy code
ā”‚                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                         ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread                                                                    ā”‚
ā”‚     return await anyio.to_thread.run_sync(                                                                                                                                                 ā”‚
ā”‚            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                 ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync                                                                                                  ā”‚
ā”‚     return await get_asynclib().run_sync_in_worker_thread(                                                                                                                                 ā”‚
ā”‚            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                 ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread                                                                       ā”‚
ā”‚     return await future                                                                                                                                                                    ā”‚
ā”‚            ^^^^^^^^^^^^                                                                                                                                                                    ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run                                                                                             ā”‚
ā”‚     result = context.run(func, *args)                                                                                                                                                      ā”‚
ā”‚              ^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                      ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 703, in _watch_job                                                                                     ā”‚
ā”‚     for event in watch.stream(                                                                                                                                                             ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream                                                                                            ā”‚
ā”‚     for line in iter_resp_lines(resp):                                                                                                                                                     ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines                                                                                    ā”‚
ā”‚     for seg in resp.stream(amt=None, decode_content=False):                                                                                                                                ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream                                                                                                  ā”‚
ā”‚     for line in self.read_chunked(amt, decode_content=decode_content):                                                                                                                     ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked                                                                                            ā”‚
ā”‚     with self._error_catcher():                                                                                                                                                            ā”‚
ā”‚   File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__                                                                                                                    ā”‚
ā”‚     self.gen.throw(typ, value, traceback)                                                                                                                                                  ā”‚
ā”‚   File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher                                                                                          ā”‚
ā”‚     raise ProtocolError("Connection broken: %r" % e, e)                                                                                                                                    ā”‚
ā”‚ urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
maybe that is causing more retries than intended?
n
hmm - i will try to reproduce this!