emre
08/11/2020, 7:52 AM0.13.1
, I can’t get the slack_notifier
to work. This used to work in 0.12.1
. Here is a minimal example of a flow:
from prefect import Flow, task, context
from prefect.utilities.notifications import slack_notifier
@task
def log_run_id():
context.get("logger").warning(context.get("flow_run_id"))
@task(state_handlers=[slack_notifier])
def get_1():
return 1
with Flow("slack_example") as flow:
log = log_run_id()
result = get_1(upstream_tasks=[log])
flow.run()
Apparently slack_notifier
attempts to communicate with a backend server, but I am not going to use one. More info in thread.[2020-08-11 07:47:52] INFO - prefect.FlowRunner | Beginning Flow run for 'slack_example'
[2020-08-11 07:47:52] INFO - prefect.FlowRunner | Starting flow run.
[2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'log_run_id': Starting task run...
[2020-08-11 07:47:52] WARNING - prefect.log_run_id | d36c58d1-4318-4a22-a317-bf7b79e38fd8
[2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'log_run_id': finished task run for task with final state: 'Success'
[2020-08-11 07:47:52] INFO - prefect.TaskRunner | Task 'get_1': Starting task run...
[2020-08-11 07:47:53] ERROR - prefect.TaskRunner | Unexpected error while calling state handlers: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))"))
Traceback (most recent call last):
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 159, in _new_conn
conn = connection.create_connection(
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 61] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 670, in urlopen
httplib_response = self._make_request(
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1230, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1276, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1225, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 1004, in _send_output
self.send(msg)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/http/client.py", line 944, in send
self.connect()
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connection.py", line 171, in _new_conn
raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
resp = conn.urlopen(
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 724, in urlopen
retries = retries.increment(
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/urllib3/util/retry.py", line 439, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/engine/runner.py", line 162, in handle_state_change
new_state = self.call_runner_target_handlers(old_state, new_state)
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/engine/task_runner.py", line 111, in call_runner_target_handlers
new_state = handler(self.task, old_state, new_state) or new_state
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/toolz/functoolz.py", line 303, in __call__
return self._partial(*args, **kwargs)
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/utilities/notifications/notifications.py", line 305, in slack_notifier
form_data = slack_message_formatter(tracked_obj, new_state)
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/utilities/notifications/notifications.py", line 159, in slack_message_formatter
url = prefect.client.Client().get_cloud_url(
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 117, in __init__
tenant_info = self.graphql({"query": {"tenant": {"id"}}})
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 268, in graphql
result = <http://self.post|self.post>(
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 223, in post
response = self._request(
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 393, in _request
response = self._send_request(
File "/Users/emreakgun/Documents/prefect_server/prefect/src/prefect/client/client.py", line 312, in _send_request
response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 578, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 530, in request
resp = self.send(prep, **send_kwargs)
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/sessions.py", line 643, in send
r = adapter.send(request, **kwargs)
File "/Users/emreakgun/Documents/prefect_server/venv/lib/python3.8/site-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9a2742de50>: Failed to establish a new connection: [Errno 61] Connection refused'))
[2020-08-11 07:47:53] INFO - prefect.TaskRunner | Task 'get_1': finished task run for task with final state: 'Failed'
[2020-08-11 07:47:53] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
flow_run_id
is present in prefect.context
. Through the existence of flow_run_id
, slack_notifier
knows that it should ask the backend for a relevant url, and supply the notification with said url.
In 0.13.1
, a flow_run_id
is available even without a backend, therefore slack_notifier
gets a Connection Refused error.
Line 151 in slack_notifier
:
if prefect.context.get("flow_run_id"):
url = None
if isinstance(tracked_obj, prefect.Flow):
url = prefect.client.Client().get_cloud_url(
"flow-run", prefect.context["flow_run_id"], as_user=False
)
elif isinstance(tracked_obj, prefect.Task):
url = prefect.client.Client().get_cloud_url(
"task-run", prefect.context.get("task_run_id", ""), as_user=False
)
if url:
notification_payload.update(title_link=url)
emmanuel
08/11/2020, 8:06 AM- webhook_secret (str, optional): the name of the Prefect Secret that stores your slack
webhook URL; defaults to `"SLACK_WEBHOOK_URL"`
maybe you can simply “inject” it with an environment variable in which case Prefect won’t try to fetch it from a remote locationwebhook_url = cast(
str, prefect.client.Secret(webhook_secret or "SLACK_WEBHOOK_URL").get()
)
nope 😄emre
08/11/2020, 8:57 AMemmanuel
08/11/2020, 8:58 AM0.12.1
?emre
08/11/2020, 9:18 AMslack_notifier
. But starting from 0.12.2
, flow objects started to supply a flow_run_id
to their flow_run_context
.
Search for flow_run_id
here:
https://github.com/PrefectHQ/prefect/blob/0.12.1/src/prefect/core/flow.py
and then do the same in:
https://github.com/PrefectHQ/prefect/blob/0.12.2/src/prefect/core/flow.pyemmanuel
08/11/2020, 9:20 AMemre
08/11/2020, 9:23 AMflow_run_id
to distinguish core only runs and runs backed by a server, but this starting from 0.12.2
, this separation doesn’t work anymore.emmanuel
08/11/2020, 9:25 AMThe connection issue isn’t to slack, it is to prefect server.Yeah I got that. It tries to connect to the (graphql?) server endpoint to retrieve the slack endpoint to use, right? At least that’s what I understand reading the
slack_notifier
code.emre
08/11/2020, 9:26 AMemmanuel
08/11/2020, 9:30 AM0.12.x
https://github.com/PrefectHQ/prefect/blob/0.12.1/src/prefect/utilities/notifications/notifications.py#L278 so probably it’s a change in the client class 😕SLACK_WEBHOOK_URL
?PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
emre
08/11/2020, 9:36 AMPREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
. I think I understood the issue, will open a github issue.Ben Davison
08/11/2020, 11:48 AM0.13.1
could you use the new webhook feature?
(I haven't tried it yet)emre
08/11/2020, 11:57 AMBen Davison
08/11/2020, 12:02 PM