https://prefect.io logo
p

Patrick Alves

02/04/2022, 2:03 PM
Hi there, I have this task with a trigger:
Copy code
@task(trigger=any_failed)
def alert_email(email, message):
    sendmail(email, '[Prefect] ADCheck - [FAILED]', message)

with Flow("AD Check") as flow:

    # Parameters
    admin_email = Parameter("admin_email", default="<mailto:patrick@xyz.br|patrick@xyz.br>")

    # Tasks
    message = "some message"

    try:
        ad_conn = connect_active_directory(ad_server, ad_user, ad_pass)

    except Exception as e:
        message = f'Erro ao executar workflow: {e}'

    # If all tasks are successful
    notify_email(admin_email, message)

    # If any task fails
    alert_email(admin_email, message)
When the task fails, I would like to get the Exception error and save it o message variable, but I am getting
Trigger was "all_successful" but some of the upstream tasks failed
@Anna Geller, any tips? I've tried to use the state_handlers, but I need to be able to change parameters of the alert function for each workflow (email, subject, etc.) So triggers seems better. But I could not get the error.
a

Anna Geller

02/04/2022, 2:25 PM
I see. I think you are on the right track with a state handler. Do you know that you can retrieve flow’s parameters within the state handler by using the Prefect context? Here is how you can do it:
Copy code
prefect.context.parameters["admin_email"]
1
One more thing: you actually shouldn’t need/use try-except blocks within your flows, since that’s part of Prefect’s value prop that you shouldn’t have to think about trapping any exceptions and Prefect will: • give you visibility into any errors that occurred when running your flow • give you mechanisms such as triggers, state handlers and signals to react to various task run states in the way you need it
p

Patrick Alves

02/04/2022, 2:29 PM
I think the parameters solves the problem
👍 1
Thanks a lot @Anna Geller!
@Anna Geller New question. I have tasks that run in parallel, so for every task that fails I'll get an e-mail. I would like to change that using a flow level state_handler:
Copy code
def sendmail_on_failure_handler(flow, old_state, new_state):    
    for task in flow.tasks:

        #if task["result"]:
        logger.error(f"Task {new_state.result}")
the output is:
Copy code
[2022-02-04 12:16:28-0300] ERROR - prefect | Task {<Task: connect_rh_db>: <Failed: "Error during execution of task: OperationalError((18456, b"Login failed for user 'usrr_cis'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (limos.tce.pa)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (limos.tce.pa)\n"))">, <Task: notify_email>: <TriggerFailed: "Trigger was "all_successful" but some of the upstream tasks failed.">, <Parameter: admin_email>: <Success: "Task run succeeded.">, <Task: connect_active_directory>: <Success: "Task run succeeded.">,
That is, all tasks and states are in the
new_state.result
object. But I could not found a way to access them. It looks like a dict, but I've tried to access all methods of a dict and did not worked. Any tips to get the states of each task? So I can send a single e-mail.
a

Anna Geller

02/04/2022, 3:49 PM
This would be hard on a flow level state handler, but there is a much easier way to solve it. You can send an email on a reduce task rather than map task or on a flow-level. E.g. given the flow below, attach a state handler to the print_results task that already has results of all child tasks.
Copy code
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor


@task
def generate_random_numbers():
    return list(range(1, 200))


@task
def add_one(x):
    return x + 1


@task(log_stdout=True, state_handlers=[your_state_handler_function])
def print_results(res):
    print(res)


with Flow("mapping", executor=LocalDaskExecutor()) as flow:
    numbers = generate_random_numbers()
    result = add_one.map(numbers)
    print_results(result)
1
p

Patrick Alves

02/04/2022, 4:30 PM
I've built another solution 😅
Copy code
def sendmail_on_failure_handler(flow, old_state, new_state):
    """
    Send e-mail with details of failed tasks
    """
    if new_state.is_failed():
        message = ""
        # Get error message of Failed tasks
        for task in str(new_state.result).split('<Task'):
            for i in ["<", ">", "{", "Task:"]:
                task = task.replace(i, "")

            if ": Failed:" in task:
                # Get only task name
                message += f"Task [{task.split(':')[1].strip()}] - Failed\n"
                error = task.split('Failed:')[1].strip().replace('\"','')
                message += f"{error}\n-----------------------\n"

        sendmail(
            prefect.context.parameters["admin_email"],
            f'[Prefect] {prefect.context["flow_name"]} - [FAILED]',
            message)

    return new_state
The result:
Copy code
Task [connect_active_directory] - Failed
Error during execution of task: LDAPBindError('automatic bind not successful - invalidCredentials'),
-----------------------
Task [connect_rh_db] - Failed
Error during execution of task: OperationalError((18456, bLogin failed for user 'usrr_cis'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (<http://server.com|server.com>)\n)),
a

Anna Geller

02/04/2022, 5:12 PM
I believe that a state handler attached to a reduce task would be much easier than a flow-level state handler in your use case, but the decision is yours. LMK if you have any question about it I can help with
6 Views