Thread
#prefect-community
    emre

    emre

    2 years ago
    Hi folks, In prefect
    0.13.1
    , I can’t get the
    slack_notifier
    to work. This used to work in
    0.12.1
    . Here is a minimal example of a flow:
    from prefect import Flow, task, context
    from prefect.utilities.notifications import slack_notifier
    
    @task
    def log_run_id():
        context.get("logger").warning(context.get("flow_run_id"))
    
    
    @task(state_handlers=[slack_notifier])
    def get_1():
        return 1
    
    
    with Flow("slack_example") as flow:
        log = log_run_id()
        result = get_1(upstream_tasks=[log])
    
    
    flow.run()
    Apparently
    slack_notifier
    attempts to communicate with a backend server, but I am not going to use one. More info in thread.
    Complete logs in case anyone needs:
    [2020-08-11 07:47:52] INFO - prefect.FlowRunner | Beginning Flow run for 'slack_example'
    [2020-08-11 07:47:52] INFO - prefect.FlowRunner | Starting flow run.
    [2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'log_run_id': Starting task run...
    [2020-08-11 07:47:52] WARNING - prefect.log_run_id | d36c58d1-4318-4a22-a317-bf7b79e38fd8
    [2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'log_run_id': finished task run for task with final state: 'Success'
    [2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'get_1': Starting task run...
    [2020-08-11 07:47:53] ERROR - prefect.TaskRunner | Unexpected error while calling state handlers: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))"))
    Traceback (most recent call last):
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 159, in _new_conn
        conn = connection.create_connection(
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 84, in create_connection
        raise err
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 74, in create_connection
        sock.connect(sa)
    ConnectionRefusedError: [Errno 61] Connection refused
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 670, in urlopen
        httplib_response = self._make_request(
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 392, in _make_request
        conn.request(method, url, **httplib_request_kw)
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1230, in request
        self._send_request(method, url, body, headers, encode_chunked)
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1276, in _send_request
        self.endheaders(body, encode_chunked=encode_chunked)
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1225, in endheaders
        self._send_output(message_body, encode_chunked=encode_chunked)
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1004, in _send_output
        self.send(msg)
      File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 944, in send
        self.connect()
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 187, in connect
        conn = self._new_conn()
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 171, in _new_conn
        raise NewConnectionError(
    urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
        resp = conn.urlopen(
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 724, in urlopen
        retries = retries.increment(
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/retry.py", line 439, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/engine/runner.py", line 162, in handle_state_change
        new_state = self.call_runner_target_handlers(old_state, new_state)
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/engine/task_runner.py", line 111, in call_runner_target_handlers
        new_state = handler(self.task, old_state, new_state) or new_state
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/toolz/functoolz.py", line 303, in __call__
        return self._partial(*args, **kwargs)
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/utilities/notifications/notifications.py", line 305, in slack_notifier
        form_data = slack_message_formatter(tracked_obj, new_state)
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/utilities/notifications/notifications.py", line 159, in slack_message_formatter
        url = prefect.client.Client().get_cloud_url(
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 117, in __init__
        tenant_info = self.graphql({"query": {"tenant": {"id"}}})
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 268, in graphql
        result = <http://self.post|self.post>(
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 223, in post
        response = self._request(
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 393, in _request
        response = self._send_request(
      File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 312, in _send_request
        response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 578, in post
        return self.request('POST', url, data=data, json=json, **kwargs)
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 530, in request
        resp = self.send(prep, **send_kwargs)
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 643, in send
        r = adapter.send(request, **kwargs)
      File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/adapters.py", line 516, in send
        raise ConnectionError(e, request=request)
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))
    [2020-08-11 07:47:53] INFO - prefect.TaskRunner | Task 'get_1': finished task run for task with final state: 'Failed'
    [2020-08-11 07:47:53] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    As far as I understand, if a flow run is connected to a backend, a
    flow_run_id
    is present in
    prefect.context
    . Through the existence of
    flow_run_id
    ,
    slack_notifier
    knows that it should ask the backend for a relevant url, and supply the notification with said url. In
    0.13.1
    , a
    flow_run_id
    is available even without a backend, therefore
    slack_notifier
    gets a Connection Refused error. Line 151 in
    slack_notifier
    :
    if prefect.context.get("flow_run_id"):
        url = None
    
        if isinstance(tracked_obj, prefect.Flow):
            url = prefect.client.Client().get_cloud_url(
                "flow-run", prefect.context["flow_run_id"], as_user=False
            )
        elif isinstance(tracked_obj, prefect.Task):
            url = prefect.client.Client().get_cloud_url(
                "task-run", prefect.context.get("task_run_id", ""), as_user=False
            )
    
        if url:
            notification_payload.update(title_link=url)
    e

    emmanuel

    2 years ago
    You could probably override the class to fetch the web-hook from somewhere else, couldn’t you?
    h’mm it’s actually just a function really…
    - webhook_secret (str, optional): the name of the Prefect Secret that stores your slack
                webhook URL; defaults to `"SLACK_WEBHOOK_URL"`
    maybe you can simply “inject” it with an environment variable in which case Prefect won’t try to fetch it from a remote location
    webhook_url = cast(
            str, prefect.client.Secret(webhook_secret or "SLACK_WEBHOOK_URL").get()
        )
    nope 😄
    Well, I guess you could write your own notification function which would work just the same but take the webhook url from somewhere else
    emre

    emre

    2 years ago
    Sure I could always write my own. Still, it is a functionality supplied by prefect, that used to work, but now doesn’t. Felt like something to report.
    e

    emmanuel

    2 years ago
    how did you provide the url in
    0.12.1
    ?
    (out of curiosity)
    emre

    emre

    2 years ago
    Thats the point, I did not. The prefect client uses a default depending on your backend choice.
    And I believe a default url always exists.
    Nothing changed with
    slack_notifier
    . But starting from
    0.12.2
    , flow objects started to supply a
    flow_run_id
    to their
    flow_run_context
    . Search for
    flow_run_id
    here:https://github.com/PrefectHQ/prefect/blob/0.12.1/src/prefect/core/flow.py and then do the same in:https://github.com/PrefectHQ/prefect/blob/0.12.2/src/prefect/core/flow.py
    e

    emmanuel

    2 years ago
    weird because Slack web-hook are customer specific https://api.slack.com/messaging/webhooks 🤔
    emre

    emre

    2 years ago
    prefect slack notifier used
    flow_run_id
    to distinguish core only runs and runs backed by a server, but this starting from
    0.12.2
    , this separation doesn’t work anymore.
    Oh you mean slack webhook. Sure it is an env var secret in my example. I was talking about prefect server url
    The connection issue isn’t to slack, it is to prefect server. My flow run is without a backend server, but the notifier attempts to connect to a prefect backend regardless
    e

    emmanuel

    2 years ago
    The connection issue isn’t to slack, it is to prefect server.
    Yeah I got that. It tries to connect to the (graphql?) server endpoint to retrieve the slack endpoint to use, right? At least that’s what I understand reading the
    slack_notifier
    code.
    emre

    emre

    2 years ago
    Exactly!
    The check for whether the flow run is backed by a server or standalone is old, it is not valid anymore.
    e

    emmanuel

    2 years ago
    h’mmm it looks like the notifier code is the same in
    0.12.x
    https://github.com/PrefectHQ/prefect/blob/0.12.1/src/prefect/utilities/notifications/notifications.py#L278 so probably it’s a change in the client class 😕
    what’s the name of the env var you use?
    SLACK_WEBHOOK_URL
    ?
    according to the doc it should probably be something like that now
    PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
    well that’s my understanding only 😕
    emre

    emre

    2 years ago
    yeah my variable is name
    PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
    . I think I understood the issue, will open a github issue.
    b

    Ben Davison

    2 years ago
    If your on
    0.13.1
    could you use the new webhook feature? (I haven't tried it yet)
    emre

    emre

    2 years ago
    Doesn’t that require a server/cloud? I am going to run this standalone.
    b

    Ben Davison

    2 years ago