Thread
#prefect-community
    KhTan

    KhTan

    8 months ago
    Hi! I intend to get Prefect to send alert only when a flow failed, but it’s sending email on every run, what have I done wrongly here (code attached)? If I also intend to get an alert when output is above certain value (output>2), should I use state_handler or something else? Thank you!!!
    from prefect import task, Flow
    from prefect.run_configs import LocalRun
    from prefect.engine.state import Failed
    from prefect.tasks.notifications.email_task import EmailTask
    from prefect.schedules import IntervalSchedule
    from prefect.utilities.notifications import gmail_notifier, slack_notifier
    from random import sample
    from datetime import date, datetime, timedelta
    
    @task(state_handlers=[gmail_notifier(only_states=[Failed])]) 
    def add():
        list1 = [1, 2, None] #it should fail roughly 1 out 3 tries
        output = 1 + sample(list1,1)[0]
        print(f'output: {output} \n\n\n')
        return 1 + output
    
    def main():
        
        schedule = IntervalSchedule(
            start_date=datetime.utcnow() + timedelta(seconds=0),
            end_date=datetime.utcnow() + timedelta(seconds=15),
            interval=timedelta(seconds=3),
        )
    
        with Flow(
            "test_alert",
            state_handlers=[gmail_notifier(only_states=[Failed])],
            schedule=schedule,
            run_config=LocalRun(),
        ) as flow:
            
            output = add()
            completion_email_task = EmailTask(email_to='<mailto:ALERT.prefect@gmail.com|ALERT.prefect@gmail.com>', email_from='<mailto:ALERT.prefect@gmail.com|ALERT.prefect@gmail.com>')
            completion_email = completion_email_task(subject="prefect alert", msg='run failed')
    
        flow.run()
    
    
    if __name__ == "__main__":
        main()
    Kevin Kho

    Kevin Kho

    8 months ago
    I am looking at this but for the output > 2, I think the EmailTask is the best
    I literally can’t see anything wrong right now. I ’ll test it
    It is working for me:
    from prefect import task, Flow
    from prefect.run_configs import LocalRun
    from prefect.engine.state import Failed
    from prefect.tasks.notifications.email_task import EmailTask
    from prefect.utilities.notifications import gmail_notifier, slack_notifier
    
    
    @task(state_handlers=[gmail_notifier(only_states=[Failed])]) 
    def test(x):
        return x + "test"
    
    with Flow(
        "test_alert",
        state_handlers=[gmail_notifier(only_states=[Failed])],
        run_config=LocalRun(),
    ) as flow:
        
        test.map(["a","b", 1])
    
    flow.run()
    This will send two emails
    Take out the 1 in the map and it doesnt try to send anything
    Are you sure it’s not the email task that is firing for all flows?
    KhTan

    KhTan

    8 months ago
    thanks tons. I might have mis-used email task how may I use emailTask for value alert? wasn’t able to find examples for it. was trying to do something like this: With Flow() as flow: ouput = test(“a”) if output == ‘atest’: completion_email_task = EmailTask(email_to=‘ALERT.prefect@gmail.com’, email_from=‘ALERT.prefect@gmail.com’) completion_email = completion_email_task(subject=“prefect value alert”)
    Kevin Kho

    Kevin Kho

    8 months ago
    You would need to use the case task instead of if
    KhTan

    KhTan

    8 months ago
    thanks!