Brian
06/09/2023, 4:04 PM@task(trigger=always_run)
def notify(job_name, **kwargs):
tasks = kwargs['upstream_tasks']
if any(s.is_failed() for s in tasks.values()):
send_fail_notification(job_name)
else:
send_success_notification(job_name)
It looks like a decorated task has no access to the actual upstream_tasks
property. I think I could do something similar by crating a custom trigger (which is passed a dictionary of tasks), but I'd like the logic to be in an actual task so that it shows up in the UI and schematics.
Our current implementation is to define separate tasks for
@task()
def notify_success():
# ...
@task(trigger=any_failed):
def notify_failed():
# ...
The issue with that solution is that the notify_failed()
task will itself fail if there are not any failed tasks, which means the Flow will fail unless we manually override the reference tasks to include everything except notify_failed()
. That works, but is messy and error prone, since any new tasks have to get added to three different task lists.