https://prefect.io logo
#prefect-community
Title
# prefect-community
a

Amogh Kulkarni

04/22/2022, 4:20 PM
Hi Prefect Team. Yesterday early morning, two of our flows failed at the same time at 3:00AM PST with the same error. Both the flows ran fine when we restarted it in the morning at around 9AM. This is a really weird non-deterministic issue. We didn’t change or modify the secret SLACK_WEBHOOK_URL in the morning when we re-ran the flow but still it the tasks ran fine. We did root cause analysis but still couldn’t find out why the flows failed. Can you please take a look at it and help us out over here? 1. https://cloud.prefect.io/immersa-prod/flow-run/0ae8fb55-fa89-4a1b-aa86-9d122f490751 2. https://cloud.prefect.io/immersa-prod/flow-run/7c59423a-f7b7-4438-959b-9259058d22b1
Copy code
Exception raised while calling state handlers: ClientError([{'path': ['secret_value'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/client/secrets.py", line 137, in get
    value = secrets[self.name]
KeyError: 'SLACK_WEBHOOK_URL'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
    new_state = super().call_runner_target_handlers(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 113, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "/usr/local/lib/python3.9/site-packages/toolz/functoolz.py", line 306, in __call__
    return self._partial(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/notifications/notifications.py", line 299, in slack_notifier
    str, prefect.client.Secret(webhook_secret or "SLACK_WEBHOOK_URL").get()
  File "/usr/local/lib/python3.9/site-packages/prefect/client/secrets.py", line 161, in get
    raise exc
  File "/usr/local/lib/python3.9/site-packages/prefect/client/secrets.py", line 145, in get
    result = self.client.graphql(
  File "
/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 570, in graphql
    raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['secret_value'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
03:02:29
I
k

Kevin Kho

04/22/2022, 4:22 PM
I think this is an API call to fetch the Secret that is failing. You can add retries to the Secret task.
Copy code
with Flow(...) as flow:
    PrefectSecret("SLACK_WEBHOOK_URL".max_retries=3)
a

Amogh Kulkarni

04/22/2022, 4:41 PM
The tasks in both the flows were a snowflake task which did not have anything to do with a prefect secret. I will send you the signature of the tasks in a minute…
Copy code
@task(log_stdout=True, state_handlers=[handler],name=f"truncate_snowflake_play_table@{dag_name}",)
def truncate_snowflake_play_table():
    sfq = <function to connecto to snowflake>
    sfq.run(query=<sql query to execute in snowflake>")
k

Kevin Kho

04/22/2022, 5:14 PM
Ah I see go it’s in the state handler? Maybe what you can do is if you add the SLACK_WEBHOOK_SECRET to your storage.
Copy code
flow.storage = ["SLACK_WEBHOOK_SECRET"]
that way, this secret is only pulled once. If you map over a lot of these tasks with the state handler, it will be pulled each time and some of those API calls may fail
a

Amogh Kulkarni

04/27/2022, 12:59 AM
@Kevin Kho We had set the SLACK_WEBHOOK_URL secret in prefect secret for slack notifications if our flow fails based on this link: https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#customizing-your-alerts So now, should we remove the SLACK_WEBHOOK_URL secret from the cloud and add it in the flow.storage during flow registration? Is my understanding correct? Will the slack notfications still work?
k

Kevin Kho

04/27/2022, 1:08 AM
No no. What this does is it will pull the secret 1 time during Flow initialization. If you do
Copy code
def mystatehandler(..):
    Secret("SLACK_WEBHOOK_URL").get()
that will pull for each task or each failed task.
It will just pull from the context already here
a

Amogh Kulkarni

04/27/2022, 1:14 AM
Got it. Makes sense. Thanks Kevin
k

Kevin Kho

04/27/2022, 1:18 AM
Oh sorry that piece of code I wrote was wrong. I meant
Copy code
flow.storage = SomeStorage(..., secrets=["SLACK_WEBHOOK_SECRET"])
just add this
a

Amogh Kulkarni

04/29/2022, 5:21 PM
thanks Kevin.
r

Rainer Schülke

05/10/2022, 8:13 AM
Same problem here with two flows which ran successfully around 2 AM CEST. Maybe it was just an issue on Slacks end during that night? This error did not show up since then and all our flows have a notifier.
k

Kevin Kho

05/10/2022, 2:43 PM
Ah I see. Thanks for reporting, though we’ve really had some latency with Secrets that would explain this too
20 Views