Madison Schott
04/29/2022, 5:42 PMAnna Geller
04/29/2022, 5:47 PM@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
SlackTask(message=output).run()
this post provides a full exampleMadison Schott
04/29/2022, 6:01 PM17:56:23 8 of 17 PASS freshness of google_sheets.misc_marketing_spend_winc_influencers... [PASS in 4.13s]
10:56:24
INFO
DbtShellTask
17:56:24 7 of 17 WARN freshness of google_sheets.misc_marketing_spend_winc............... [WARN in 4.39s]
Anna Geller
04/29/2022, 6:09 PM@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
for line in output:
if "WARN" in line:
SlackTask(message=output).run()
Madison Schott
04/29/2022, 6:54 PMBeginning health checks...
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 152, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
TypeError: code() takes at most 15 arguments (16 given)
Anna Geller
04/29/2022, 7:28 PMMadison Schott
04/29/2022, 7:31 PM@task(trigger=any_failed, checkpoint=False)
def send_slack_alert_on_test(output):
SlackTask(message=output).run()
Anna Geller
04/29/2022, 7:31 PMignore_healthchecks=True
, but the health checks are for a reason, looks like something in your flow code cannot be serialized with cloudpickle. If you need help debugging this, can you share your flow code?Madison Schott
04/29/2022, 7:42 PMAnna Geller
04/29/2022, 7:43 PMMadison Schott
04/29/2022, 7:43 PMAnna Geller
04/29/2022, 8:07 PMMadison Schott
04/29/2022, 8:12 PMFROM python:3.9-slim
RUN python -m pip install
# specify a base image
FROM prefecthq/prefect:latest
Unexpected error: AttributeError("'FunctionTask' object has no attribute 'retry_on'")
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 1000, in check_for_retry
self.task.retry_on
AttributeError: 'FunctionTask' object has no attribute 'retry_on'
Anna Geller
05/01/2022, 4:13 PMretry_on
is a recently added argument to the Task class, which makes me believe there is still some version mismatch between your registration and your prod runtime environments. Looks like your registration/build environment uses a more recent version of Prefect where this new argument exists, while your runtime/production environment is on an older version of Prefect and doesn't yet have this argument retry_on
Madison Schott
05/02/2022, 7:17 PMAnna Geller
05/02/2022, 7:46 PMpip install prefect --upgrade
in bothMadison Schott
05/02/2022, 7:47 PMAnna Geller
05/02/2022, 7:48 PMMadison Schott
05/02/2022, 9:08 PM@task(trigger=all_finished, checkpoint=False)
def send_slack_alert_on_test(output):
for line in output:
if "WARN" in line:
print(line)
SlackTask(message=line).run()
@task(trigger=all_finished, checkpoint=False, log_stdout=True)
def send_slack_alert_on_test(output):
for line in output:
if "WARN" in line:
print(line)
SlackTask(message=line).run()
Anna Geller
05/03/2022, 12:31 AMwill printing out the line here print to the Prefect logs?A really good question! I think it should work, as long as you add
log_stdout=True
to your task decorator (which you did) π€
Alternatively, you could do the same using logger which is safer:
logger = prefect.context.get("logger")
for line in output:
if "WARN" in line:
<http://logger.info|logger.info>(line)
Madison Schott
05/03/2022, 5:15 PMAnna Geller
05/03/2022, 7:04 PMMadison Schott
05/03/2022, 7:51 PMdef send_slack_alert_on_test(output):
print(output)
20:35:55 Done.
messageAnna Geller
05/03/2022, 8:51 PMlogger = prefect.context.get("logger")
if "WARN" in output:
<http://logger.info|logger.info>(output) # or put it into Slack task
Madison Schott
05/03/2022, 8:56 PMreturn_all=True
to my dbt taskAnna Geller
05/03/2022, 9:27 PMMadison Schott
05/04/2022, 10:27 PM