Vladislav Bogucharov
04/26/2021, 10:51 PMfrom prefect import task, Flow
from prefect.triggers import any_failed
from random import randint
@task()
def extract():
# this task generates random integers from 0,1,2,3,4,5.
random_number = randint(0, 5)
return random_number
@task()
def transform(random_number):
# this task is potentially dangerous, since the random number can be zero, and then there will be division by zero
data = 100 / random_number
return data
@task()
def load(data):
# just print data
print(data)
@task(trigger=any_failed)
def telegram_alarm():
# this task is needed to send notifications about fails in telegram.
print('ERROR') # How can I display a more informative message? For example, which task broke? Or what error caused the fail? (in my case ZeroDivisionError: division by zero)
# import prefect
# print(prefect.context.to_dict()) # tried this method but didn't find the needed logs
with Flow('test_etl') as flow:
random_number = extract()
data = transform(random_number)
load(data)
telegram_alarm(upstream_tasks=[flow.get_tasks()])
flow_state = flow.run()
Kevin Kho
Kevin Kho
Vladislav Bogucharov
04/27/2021, 4:50 AMKevin Kho