KhTan
01/06/2022, 9:40 PMKhTan
01/06/2022, 9:40 PMfrom prefect import task, Flow
from prefect.run_configs import LocalRun
from prefect.engine.state import Failed
from prefect.tasks.notifications.email_task import EmailTask
from prefect.schedules import IntervalSchedule
from prefect.utilities.notifications import gmail_notifier, slack_notifier
from random import sample
from datetime import date, datetime, timedelta
@task(state_handlers=[gmail_notifier(only_states=[Failed])]) 
def add():
    list1 = [1, 2, None] #it should fail roughly 1 out 3 tries
    output = 1 + sample(list1,1)[0]
    print(f'output: {output} \n\n\n')
    return 1 + output
def main():
    
    schedule = IntervalSchedule(
        start_date=datetime.utcnow() + timedelta(seconds=0),
        end_date=datetime.utcnow() + timedelta(seconds=15),
        interval=timedelta(seconds=3),
    )
    with Flow(
        "test_alert",
        state_handlers=[gmail_notifier(only_states=[Failed])],
        schedule=schedule,
        run_config=LocalRun(),
    ) as flow:
        
        output = add()
        completion_email_task = EmailTask(email_to='<mailto:ALERT.prefect@gmail.com|ALERT.prefect@gmail.com>', email_from='<mailto:ALERT.prefect@gmail.com|ALERT.prefect@gmail.com>')
        completion_email = completion_email_task(subject="prefect alert", msg='run failed')
    flow.run()
if __name__ == "__main__":
    main()Kevin Kho
Kevin Kho
Kevin Kho
from prefect import task, Flow
from prefect.run_configs import LocalRun
from prefect.engine.state import Failed
from prefect.tasks.notifications.email_task import EmailTask
from prefect.utilities.notifications import gmail_notifier, slack_notifier
@task(state_handlers=[gmail_notifier(only_states=[Failed])]) 
def test(x):
    return x + "test"
with Flow(
    "test_alert",
    state_handlers=[gmail_notifier(only_states=[Failed])],
    run_config=LocalRun(),
) as flow:
    
    test.map(["a","b", 1])
flow.run()Kevin Kho
Kevin Kho
KhTan
01/06/2022, 11:06 PMKevin Kho
KhTan
01/06/2022, 11:56 PM