I want to add slack notifications in case there is...
# ask-community
d
I want to add slack notifications in case there is an error while running the flow. I have the lines below as part of my code:
Copy code
from prefect.utilities.notifications import slack_notifier
from prefect.engine.state import Failed
SLACK_WEBHOOK_URL = "the slack secret webhook url"
# os.environ["PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL"] = "<https://hooks.slack.com/services/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>"
handler = slack_notifier(only_states=[Failed])
prefect.context.setdefault("secrets", {})
prefect.context.secrets["SLACK_WEBHOOK_URL"] = "the slack secret webhook url"
I have even created config.toml file in .predict directory with the content below:
Copy code
[context.secrets]
SLACK_WEBHOOK_URL = "the slack secret webhook url"
My task stats with:
Copy code
@task(nout=2, max_retries=1, retry_delay=timedelta(seconds=1), checkpoint=False, state_handlers=[handler])
I also tried to add prefect app to slack. But when running my flows I am seeing this error (part of the error): see thread To make it clear, when I remove the
state_handlers=[handler]
from my task definition the flow runs fine. When I visit the prefect app in slack I see this too, which I am not sure if it connected to my problem or not. Any comments on how I can solve the slack notification problem?
k
Hey @Davoud Hejazi, can we just try sticking to the
config.toml
first? The os.environ won’t help because Prefect has already initialized the context by that point. You also can’t mutate the context like the following lines. For now, I would just try:
Copy code
from prefect.utilities.notifications import slack_notifier
from prefect.engine.state import Failed

handler = slack_notifier(only_states=[Failed])
and then you can see if your secret is pullable with
Copy code
Secret("SLACK_WEBHOOK_URL").get()
and try printing the value of that. Do you get the error from
flow.run()
or running with an agent?
Could you move the error to the thread so we dont crowd the main channel too much?
d
To make it simple I am just doing "prefect run --path ..." in terminal
k
Ah I see. Let me look into that a bit
d
Copy code
└── 16:12:20 | ERROR   | Unexpected error while calling state handlers: ValueError('Local Secret "SLACK_WEBHOOK_URL" was not found.')
Traceback (most recent call last):
  File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/engine/runner.py", line 161, in handle_state_change
    new_state = self.call_runner_target_handlers(old_state, new_state)
  File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 113, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/toolz/functoolz.py", line 306, in __call__
    return self._partial(*args, **kwargs)
  File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/utilities/notifications/notifications.py", line 295, in slack_notifier
    str, prefect.client.Secret(webhook_secret or "SLACK_WEBHOOK_URL").get()
  File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/client/secrets.py", line 167, in get
    raise ValueError(
ValueError: Local Secret "SLACK_WEBHOOK_URL" was not found.
os.environ is commented out in my code; I am trying to put the necessary steps in my code rather than a separate config file, unless I cannot help it
k
Import Prefect already initialized the context so you need to do it before:
Copy code
import os
os.environ["SLAC_WEBHOOK_URL"] = ...

#prefect imports
because one it gets imported, it can’t be mutated
d
I did :
Copy code
import os
os.environ["SLAC_WEBHOOK_URL"] = ...
but after importing prefect
didn't work; do you mean to do
Copy code
import os
os.environ["SLAC_WEBHOOK_URL"] = ...
almost before the rest of the packages? In the future we will shift the code and components into AWS, so I want to be independent from my local variables.
k
Yes I am telling you to move it before otherwise it won’t take effect and get loaded into the context. Actually the format is:
Copy code
import os
os.environ["PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL"] = ...
d
Just tried. same error
k
Can you try this script?
Copy code
import os
os.environ["PREFECT__CONTEXT__SECRETS__SOME_SECRET"] = "test"

import prefect
print(prefect.context.secrets.get("SOME_SECRET"))
Are you on Prefect Cloud or Server?
d
cloud
well at this point I am runnig it locally, just to test
k
I know but you can pull a Cloud secret too for local testing because you are moving it to AWS eventually right?
d
yes, I have pulled a cloud secret I think, that's the webhook, isn't it?
I saved the script you gave in a test.py file and here is the error:
Copy code
((S3-reorganizing) ) davoud@TitanLenovoL1501:~/s3$ prefect run --path test.py
Retrieving local flow... Error
  Traceback (most recent call last):
    File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/cli/run.py", line 75, in try_error_done
    yield
    File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/cli/run.py", line 589, in run
    flow = get_flow_from_path_or_module(path=path, module=module, name=name)
    File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/cli/run.py", line 173, in get_flow_from_path_or_module
    flows = load_flows_from_script(path) if path else load_flows_from_module(module)
    File "/home/davoud/.local/share/virtualenvs/S3-reorganizing-_0Ro3yi6/lib/python3.8/site-packages/prefect/cli/run.py", line 121, in load_flows_from_script
    namespace = runpy.run_path(path, run_name="<flow>")
    File "/home/davoud/.pyenv/versions/3.8.11/lib/python3.8/runpy.py", line 265, in run_path
    return _run_module_code(code, init_globals, run_name,
    File "/home/davoud/.pyenv/versions/3.8.11/lib/python3.8/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
    File "/home/davoud/.pyenv/versions/3.8.11/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
    File "test.py", line 5, in <module>
    print(prefect.context.secrets.get("SOME_SECRET"))
  AttributeError: 'Context' object has no attribute 'secrets'
k
But your error is saying that it is pulling from Local (config.toml or env variable). In order to use the Cloud secret, you can
export PREFECT___CLOUD____USE__LOCAL_SECRETS=false
and it will pull the Cloud secret for local runs as well. When you run with an agent on AWS, the default gets set to true
Ah ok let me try to replicate that
Wait, how does it even run? Do you have a Flow in it? I get
Copy code
Found no flows at xxx.py.
I was intending for you to do
python xxx.py
d
no, let me add a flow quickly
k
Oh no I didn’t mean to add a flow. I just wanted to test the 4 line script as is
d
yes it runs with the output of "test"
k
Yes exactly so if add the SLACK_WEBHOOK_URL there that same way, it will get loaded into the context (this was the solution for using a local secret without the
config.toml
)
d
what should I try now?
k
Defining it that way should work for your local test with the Flow as long as the imports are in that order. If you want to test by pulling the Cloud secret, you can export the
USE_LOCAL_SECRETS
env variable above and it will pull the
SLACK_WEBHOOK_URL
secret stored in Prefect Cloud and that experience should be the same on AWS and on local
d
I am trying to decipher what you just said
Let's say those 4 lines are the first lines of my code. What do add afterwards? this?
export PREFECT___CLOUD____USE__LOCAL_SECRETS=false
k
To get your initial problem working: 1. Replace SOME_SECRET with SLACK_WEBHOOK_URL in the os.environ 2. Add that above your Prefect imports 3. Run flow like you did before and hopefully it works To switch to using Cloud secrets for local testing (equivalent setup to AWS in future): 1. Add SLACK_WEBHOOK_URL secret to Cloud (you may have already) 2. Add this env variable locally
export PREFECT___CLOUD____USE__LOCAL_SECRETS=false
3. Run the flow as is
d
The first few lines of my code are like this:
Copy code
import os
os.environ["PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL"] = "<https://hooks.slack.com/services/> ... the rest of url"

from prefect import task, Flow, unmapped, Parameter
 ---- and some more imports here ----
handler = slack_notifier(only_states=[Failed])
# export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
Then there are flows, and finally I run it using:
Copy code
prefect run --path mycode.py
I am not sure why I see the same error
k
Man that’s weird. I’ll test some more and really use the CLI
d
I guess I am missing one crucial step somewhere
k
No no I can replicate when using the CLI now. I don’t understand
prefect run
well enough. Using
flow.run()
with
python myflow.py
will work for this.
👍 1
I’ll look into it a bit more
d
Huh! you are right! adding
flow.run()
and then
python myflow.py
actually sends the error notifications to slack!
Now, after
handler = slack_notifier(only_states=[Failed])
if I add
export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
it will try to use cloud secret key?
k
Ok I think I understand now.
Copy code
import prefect
Loads in and creates the context. But for the
prefect run …
CLI command, it creates a new flow run with a new context here so the context from the
os.environ
does not carry over.
👍 1
Yes exactly it will pull the Secret from Cloud with that setting. For runs on an agent, the default for that is True
d
I tried and that also works; thank you man. Now I am going to customize the error message, because the current message has no information from the step it failed:
k
I don’t you think can customize the message, I recommend using the SlackTask for that
👍 1
Something like this:
Copy code
def message_on_err(obj, old_state, new_state):
    msg = "Err running task"
    if new_state.is_failed():
        send_message = SlackTask(webhook_secret="SLACK_WEBHOOK_URL")
        send_message.run(message=msg)

@task(state_handlers=[message_on_err])
def abc(x):
    return x
Then you get information from the step that Failed, use
prefect.context.get("task_name")
or something like that and add it to the message. You can pull anything from the Context in the state handler to construct the message
🙏 1
d
@Kevin Kho When adding
export PREFECT__CLOUD__USE_LOCAL_SECRETS = False
I am getting error below:
Copy code
export PREFECT__CLOUD__USE_LOCAL_SECRETS = False
           ^
SyntaxError: invalid syntax
k
This is for bash/command line. Not for Python script. It sets an environment variable
I guess the command can vary depending on your OS but you want to set this as an environment variable