KhTan
01/06/2022, 9:40 PMKhTan
01/06/2022, 9:40 PMfrom prefect import task, Flow
from prefect.run_configs import LocalRun
from prefect.engine.state import Failed
from prefect.tasks.notifications.email_task import EmailTask
from prefect.schedules import IntervalSchedule
from prefect.utilities.notifications import gmail_notifier, slack_notifier
from random import sample
from datetime import date, datetime, timedelta
@task(state_handlers=[gmail_notifier(only_states=[Failed])])
def add():
list1 = [1, 2, None] #it should fail roughly 1 out 3 tries
output = 1 + sample(list1,1)[0]
print(f'output: {output} \n\n\n')
return 1 + output
def main():
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=0),
end_date=datetime.utcnow() + timedelta(seconds=15),
interval=timedelta(seconds=3),
)
with Flow(
"test_alert",
state_handlers=[gmail_notifier(only_states=[Failed])],
schedule=schedule,
run_config=LocalRun(),
) as flow:
output = add()
completion_email_task = EmailTask(email_to='<mailto:ALERT.prefect@gmail.com|ALERT.prefect@gmail.com>', email_from='<mailto:ALERT.prefect@gmail.com|ALERT.prefect@gmail.com>')
completion_email = completion_email_task(subject="prefect alert", msg='run failed')
flow.run()
if __name__ == "__main__":
main()
Kevin Kho
Kevin Kho
Kevin Kho
from prefect import task, Flow
from prefect.run_configs import LocalRun
from prefect.engine.state import Failed
from prefect.tasks.notifications.email_task import EmailTask
from prefect.utilities.notifications import gmail_notifier, slack_notifier
@task(state_handlers=[gmail_notifier(only_states=[Failed])])
def test(x):
return x + "test"
with Flow(
"test_alert",
state_handlers=[gmail_notifier(only_states=[Failed])],
run_config=LocalRun(),
) as flow:
test.map(["a","b", 1])
flow.run()
This will send two emailsKevin Kho
Kevin Kho
KhTan
01/06/2022, 11:06 PMKevin Kho
KhTan
01/06/2022, 11:56 PM