Apoorva Desai
03/14/2022, 9:40 PMfrom prefect.utilities.notifications import slack_notifier
and my tasks now look like this :
task(log_stdout=True, state_handlers=[slack_notifier])
install_snowflake_task = ShellTask(helper_script="pip install boto3 \
snowflake-connector-python[pandas] \
snowflake-ingest && pip install PyJWT==1.7.1", shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
install_dbt_task = ShellTask(helper_script="pip install dbt==0.18.0", \
shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
My flow looks like
with Flow("name-of-flow", state_handlers=[slack_notifier]) as flow:
The flow runs successfully but I see no notifications on the slack channel that I have authorized for this. What am I doing wrong?Anna Geller
03/14/2022, 9:42 PMSlackTask
Apoorva Desai
03/14/2022, 9:46 PMAnna Geller
03/14/2022, 9:47 PMApoorva Desai
03/14/2022, 10:02 PMError during execution of task: ValueError('Local Secret "PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL_PREFECT" was not found.')
slack = SlackTask(webhook_secret="PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL_PREFECT")
Anna Geller
03/14/2022, 11:52 PMexport PREFECT__CLOUD__USE_LOCAL_SECRETS=false
Apoorva Desai
03/15/2022, 12:09 AMAnna Geller
03/15/2022, 1:05 AM[cloud]
use_local_secrets = false
Apoorva Desai
03/15/2022, 1:14 AMAnna Geller
03/15/2022, 1:14 AMprefect agent kubernetes start --env PREFECT__CLOUD__USE_LOCAL_SECRETS=false
spec:
containers:
- args:
- prefect agent kubernetes start
command:
- /bin/bash
- -c
env:
- name: PREFECT__CLOUD__USE_LOCAL_SECRETS
value: false
Apoorva Desai
03/15/2022, 4:11 PMslack = SlackTask()
and passed in a webhook_url
instead of secret in the flow like this: slack(message="Hi from a flow!", webhook_url=SLACK_WEBHOOK_URL_PREFECT)
. This works and I see the message on the chosen channel. However, I still get a failure from SlackTask at some point in the flow with this message:
Task 'SlackTask': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 445, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/tasks/notifications/slack_task.py", line 61, in run
webhook_url = webhook_url or cast(str, Secret(webhook_secret).get())
File "/usr/local/lib/python3.7/site-packages/prefect/client/secrets.py", line 142, in get
) from None
ValueError: Local Secret "SLACK_WEBHOOK_URL" was not found.
Also, the state handler function you shared fails too:
Exception raised while calling state handlers: AttributeError("'Context' object has no attribute 'task_name'")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/flow_runner.py", line 120, in call_runner_target_handlers
old_state=old_state, new_state=new_state
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 116, in call_runner_target_handlers
new_state = handler(self.flow, old_state, new_state) or new_state
File "<string>", line 25, in post_to_slack_on_failure
AttributeError: 'Context' object has no attribute 'task_name'
Are there some imports I am missing? Maybe something for old_state, new_state?Kevin Kho
03/15/2022, 7:57 PMwebhook_url
. It takes in a webhook_secret
that is a string that is used to fetch the Secret with that name from Prefect Cloud. Because you used webhook_url
, it looked for the default secret name which is SLACK_WEBHOOK_URL
Anna Geller
03/15/2022, 7:57 PMValueError: Local Secret "SLACK_WEBHOOK_URL" was not found
Apoorva Desai
03/15/2022, 8:11 PMAnna Geller
03/15/2022, 8:12 PMPREFECT__CONTEXT__SECRETS__GITHUB_ACCESS_TOKEN
you need PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL_PREFECT
Apoorva Desai
03/15/2022, 8:37 PMAnna Geller
03/15/2022, 9:24 PMwebhook_secret
rather than webhook_url
here:
SlackTask(message=msg, webhook_url=SLACK_WEBHOOK_URL_PREFECT).run()
also, the state handler will fire only when your task will fail. The way you set this up at the moment it's super unlikely to fail because it does nothing other than printing something to the console 😄
@task(log_stdout=True, state_handlers=[post_to_slack_on_failure])
def print_output(output):
logger = prefect.context.get("logger")
for l in output:
<http://logger.info|logger.info>(l)
SlackTask(message=msg, webhook_secret=SLACK_WEBHOOK_URL_PREFECT).run()
provided you set the Secret as we discussed e.g.:
export PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL_PREFECT=xxx
but if you set it to the default name, you don't have to set it explicitly - I would actually recommend that:
export PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL=xxx
then you can just do:
SlackTask(message=msg).run()
Lon Nix
03/15/2022, 9:28 PMApoorva Desai
03/15/2022, 9:33 PM