https://prefect.io logo
e

emre

08/11/2020, 7:52 AM
Hi folks, In prefect
0.13.1
, I can’t get the
slack_notifier
to work. This used to work in
0.12.1
. Here is a minimal example of a flow:
Copy code
from prefect import Flow, task, context
from prefect.utilities.notifications import slack_notifier

@task
def log_run_id():
    context.get("logger").warning(context.get("flow_run_id"))


@task(state_handlers=[slack_notifier])
def get_1():
    return 1


with Flow("slack_example") as flow:
    log = log_run_id()
    result = get_1(upstream_tasks=[log])


flow.run()
Apparently
slack_notifier
attempts to communicate with a backend server, but I am not going to use one. More info in thread.
Complete logs in case anyone needs:
Copy code
[2020-08-11 07:47:52] INFO - prefect.FlowRunner | Beginning Flow run for 'slack_example'
[2020-08-11 07:47:52] INFO - prefect.FlowRunner | Starting flow run.
[2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'log_run_id': Starting task run...
[2020-08-11 07:47:52] WARNING - prefect.log_run_id | d36c58d1-4318-4a22-a317-bf7b79e38fd8
[2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'log_run_id': finished task run for task with final state: 'Success'
[2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'get_1': Starting task run...
[2020-08-11 07:47:53] ERROR - prefect.TaskRunner | Unexpected error while calling state handlers: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))"))
Traceback (most recent call last):
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 159, in _new_conn
    conn = connection.create_connection(
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 670, in urlopen
    httplib_response = self._make_request(
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1230, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1276, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1225, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1004, in _send_output
    self.send(msg)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 944, in send
    self.connect()
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 187, in connect
    conn = self._new_conn()
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 171, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
    resp = conn.urlopen(
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 724, in urlopen
    retries = retries.increment(
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/retry.py", line 439, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/engine/runner.py", line 162, in handle_state_change
    new_state = self.call_runner_target_handlers(old_state, new_state)
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/engine/task_runner.py", line 111, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/toolz/functoolz.py", line 303, in __call__
    return self._partial(*args, **kwargs)
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/utilities/notifications/notifications.py", line 305, in slack_notifier
    form_data = slack_message_formatter(tracked_obj, new_state)
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/utilities/notifications/notifications.py", line 159, in slack_message_formatter
    url = prefect.client.Client().get_cloud_url(
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 117, in __init__
    tenant_info = self.graphql({"query": {"tenant": {"id"}}})
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 268, in graphql
    result = <http://self.post|self.post>(
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 223, in post
    response = self._request(
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 393, in _request
    response = self._send_request(
  File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 312, in _send_request
    response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 578, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))
[2020-08-11 07:47:53] INFO - prefect.TaskRunner | Task 'get_1': finished task run for task with final state: 'Failed'
[2020-08-11 07:47:53] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
As far as I understand, if a flow run is connected to a backend, a
flow_run_id
is present in
prefect.context
. Through the existence of
flow_run_id
,
slack_notifier
knows that it should ask the backend for a relevant url, and supply the notification with said url. In
0.13.1
, a
flow_run_id
is available even without a backend, therefore
slack_notifier
gets a Connection Refused error. Line 151 in
slack_notifier
:
Copy code
if prefect.context.get("flow_run_id"):
    url = None

    if isinstance(tracked_obj, prefect.Flow):
        url = prefect.client.Client().get_cloud_url(
            "flow-run", prefect.context["flow_run_id"], as_user=False
        )
    elif isinstance(tracked_obj, prefect.Task):
        url = prefect.client.Client().get_cloud_url(
            "task-run", prefect.context.get("task_run_id", ""), as_user=False
        )

    if url:
        notification_payload.update(title_link=url)
e

emmanuel

08/11/2020, 8:06 AM
You could probably override the class to fetch the web-hook from somewhere else, couldn’t you?
h’mm it’s actually just a function really…
Copy code
- webhook_secret (str, optional): the name of the Prefect Secret that stores your slack
            webhook URL; defaults to `"SLACK_WEBHOOK_URL"`
maybe you can simply “inject” it with an environment variable in which case Prefect won’t try to fetch it from a remote location
Copy code
webhook_url = cast(
        str, prefect.client.Secret(webhook_secret or "SLACK_WEBHOOK_URL").get()
    )
nope 😄
Well, I guess you could write your own notification function which would work just the same but take the webhook url from somewhere else
e

emre

08/11/2020, 8:57 AM
Sure I could always write my own. Still, it is a functionality supplied by prefect, that used to work, but now doesn’t. Felt like something to report.
e

emmanuel

08/11/2020, 8:58 AM
how did you provide the url in
0.12.1
?
(out of curiosity)
e

emre

08/11/2020, 9:18 AM
Thats the point, I did not. The prefect client uses a default depending on your backend choice.
And I believe a default url always exists.
Nothing changed with
slack_notifier
. But starting from
0.12.2
, flow objects started to supply a
flow_run_id
to their
flow_run_context
. Search for
flow_run_id
here: https://github.com/PrefectHQ/prefect/blob/0.12.1/src/prefect/core/flow.py and then do the same in: https://github.com/PrefectHQ/prefect/blob/0.12.2/src/prefect/core/flow.py
e

emmanuel

08/11/2020, 9:20 AM
weird because Slack web-hook are customer specific https://api.slack.com/messaging/webhooks 🤔
e

emre

08/11/2020, 9:23 AM
prefect slack notifier used
flow_run_id
to distinguish core only runs and runs backed by a server, but this starting from
0.12.2
, this separation doesn’t work anymore.
Oh you mean slack webhook. Sure it is an env var secret in my example. I was talking about prefect server url
The connection issue isn’t to slack, it is to prefect server. My flow run is without a backend server, but the notifier attempts to connect to a prefect backend regardless
e

emmanuel

08/11/2020, 9:25 AM
The connection issue isn’t to slack, it is to prefect server.
Yeah I got that. It tries to connect to the (graphql?) server endpoint to retrieve the slack endpoint to use, right? At least that’s what I understand reading the
slack_notifier
code.
e

emre

08/11/2020, 9:26 AM
Exactly!
The check for whether the flow run is backed by a server or standalone is old, it is not valid anymore.
e

emmanuel

08/11/2020, 9:30 AM
h’mmm it looks like the notifier code is the same in
0.12.x
https://github.com/PrefectHQ/prefect/blob/0.12.1/src/prefect/utilities/notifications/notifications.py#L278 so probably it’s a change in the client class 😕
what’s the name of the env var you use?
SLACK_WEBHOOK_URL
?
according to the doc it should probably be something like that now
PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
well that’s my understanding only 😕
e

emre

08/11/2020, 9:36 AM
yeah my variable is name
PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
. I think I understood the issue, will open a github issue.
b

Ben Davison

08/11/2020, 11:48 AM
If your on
0.13.1
could you use the new webhook feature? (I haven't tried it yet)
e

emre

08/11/2020, 11:57 AM
Doesn’t that require a server/cloud? I am going to run this standalone.
b

Ben Davison

08/11/2020, 12:02 PM