Madison Schott
04/29/2022, 5:42 PMAnna Geller
@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
SlackTask(message=output).run()
this post provides a full exampleMadison Schott
04/29/2022, 6:01 PMMadison Schott
04/29/2022, 6:02 PM17:56:23 8 of 17 PASS freshness of google_sheets.misc_marketing_spend_winc_influencers... [PASS in 4.13s]
10:56:24
INFO
DbtShellTask
17:56:24 7 of 17 WARN freshness of google_sheets.misc_marketing_spend_winc............... [WARN in 4.39s]
Madison Schott
04/29/2022, 6:02 PMAnna Geller
@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
for line in output:
if "WARN" in line:
SlackTask(message=output).run()
Madison Schott
04/29/2022, 6:54 PMBeginning health checks...
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 152, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
TypeError: code() takes at most 15 arguments (16 given)
Anna Geller
Madison Schott
04/29/2022, 7:31 PMMadison Schott
04/29/2022, 7:31 PM@task(trigger=any_failed, checkpoint=False)
def send_slack_alert_on_test(output):
SlackTask(message=output).run()
Anna Geller
ignore_healthchecks=True
, but the health checks are for a reason, looks like something in your flow code cannot be serialized with cloudpickle. If you need help debugging this, can you share your flow code?Anna Geller
Anna Geller
Anna Geller
Madison Schott
04/29/2022, 7:42 PMAnna Geller
Madison Schott
04/29/2022, 7:43 PMMadison Schott
04/29/2022, 8:05 PMAnna Geller
Madison Schott
04/29/2022, 8:12 PMMadison Schott
04/29/2022, 8:13 PMFROM python:3.9-slim
RUN python -m pip install
Madison Schott
04/29/2022, 9:11 PM# specify a base image
FROM prefecthq/prefect:latest
Madison Schott
04/29/2022, 9:15 PMMadison Schott
04/29/2022, 9:44 PMUnexpected error: AttributeError("'FunctionTask' object has no attribute 'retry_on'")
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 1000, in check_for_retry
self.task.retry_on
AttributeError: 'FunctionTask' object has no attribute 'retry_on'
Madison Schott
04/29/2022, 9:44 PMMadison Schott
04/29/2022, 9:59 PMAnna Geller
retry_on
is a recently added argument to the Task class, which makes me believe there is still some version mismatch between your registration and your prod runtime environments. Looks like your registration/build environment uses a more recent version of Prefect where this new argument exists, while your runtime/production environment is on an older version of Prefect and doesn't yet have this argument retry_on
Madison Schott
05/02/2022, 7:17 PMMadison Schott
05/02/2022, 7:46 PMAnna Geller
pip install prefect --upgrade
in bothMadison Schott
05/02/2022, 7:47 PMAnna Geller
Madison Schott
05/02/2022, 9:08 PM@task(trigger=all_finished, checkpoint=False)
def send_slack_alert_on_test(output):
for line in output:
if "WARN" in line:
print(line)
SlackTask(message=line).run()
Madison Schott
05/02/2022, 9:41 PM@task(trigger=all_finished, checkpoint=False, log_stdout=True)
def send_slack_alert_on_test(output):
for line in output:
if "WARN" in line:
print(line)
SlackTask(message=line).run()
Anna Geller
will printing out the line here print to the Prefect logs?A really good question! I think it should work, as long as you add
log_stdout=True
to your task decorator (which you did) 🤔
Alternatively, you could do the same using logger which is safer:
logger = prefect.context.get("logger")
for line in output:
if "WARN" in line:
<http://logger.info|logger.info>(line)
Madison Schott
05/03/2022, 5:15 PMMadison Schott
05/03/2022, 5:56 PMAnna Geller
Madison Schott
05/03/2022, 7:51 PMMadison Schott
05/03/2022, 8:40 PMMadison Schott
05/03/2022, 8:41 PMMadison Schott
05/03/2022, 8:41 PMdef send_slack_alert_on_test(output):
print(output)
Madison Schott
05/03/2022, 8:41 PM20:35:55 Done.
messageMadison Schott
05/03/2022, 8:42 PMAnna Geller
logger = prefect.context.get("logger")
if "WARN" in output:
<http://logger.info|logger.info>(output) # or put it into Slack task
Anna Geller
Madison Schott
05/03/2022, 8:56 PMMadison Schott
05/03/2022, 8:56 PMreturn_all=True
to my dbt taskAnna Geller
Anna Geller
Madison Schott
05/04/2022, 10:27 PM