Patrick Alves
02/04/2022, 2:03 PM@task(trigger=any_failed)
def alert_email(email, message):
sendmail(email, '[Prefect] ADCheck - [FAILED]', message)
with Flow("AD Check") as flow:
# Parameters
admin_email = Parameter("admin_email", default="<mailto:patrick@xyz.br|patrick@xyz.br>")
# Tasks
message = "some message"
try:
ad_conn = connect_active_directory(ad_server, ad_user, ad_pass)
except Exception as e:
message = f'Erro ao executar workflow: {e}'
# If all tasks are successful
notify_email(admin_email, message)
# If any task fails
alert_email(admin_email, message)
When the task fails, I would like to get the Exception error and save it o message variable, but I am getting Trigger was "all_successful" but some of the upstream tasks failed
@Anna Geller, any tips?
I've tried to use the state_handlers, but I need to be able to change parameters of the alert function for each workflow (email, subject, etc.)
So triggers seems better. But I could not get the error.Anna Geller
prefect.context.parameters["admin_email"]
Patrick Alves
02/04/2022, 2:29 PMdef sendmail_on_failure_handler(flow, old_state, new_state):
for task in flow.tasks:
#if task["result"]:
logger.error(f"Task {new_state.result}")
the output is:
[2022-02-04 12:16:28-0300] ERROR - prefect | Task {<Task: connect_rh_db>: <Failed: "Error during execution of task: OperationalError((18456, b"Login failed for user 'usrr_cis'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (limos.tce.pa)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (limos.tce.pa)\n"))">, <Task: notify_email>: <TriggerFailed: "Trigger was "all_successful" but some of the upstream tasks failed.">, <Parameter: admin_email>: <Success: "Task run succeeded.">, <Task: connect_active_directory>: <Success: "Task run succeeded.">,
That is, all tasks and states are in the new_state.result
object. But I could not found a way to access them.
It looks like a dict, but I've tried to access all methods of a dict and did not worked.
Any tips to get the states of each task? So I can send a single e-mail.Anna Geller
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
@task
def generate_random_numbers():
return list(range(1, 200))
@task
def add_one(x):
return x + 1
@task(log_stdout=True, state_handlers=[your_state_handler_function])
def print_results(res):
print(res)
with Flow("mapping", executor=LocalDaskExecutor()) as flow:
numbers = generate_random_numbers()
result = add_one.map(numbers)
print_results(result)
Patrick Alves
02/04/2022, 4:30 PMdef sendmail_on_failure_handler(flow, old_state, new_state):
"""
Send e-mail with details of failed tasks
"""
if new_state.is_failed():
message = ""
# Get error message of Failed tasks
for task in str(new_state.result).split('<Task'):
for i in ["<", ">", "{", "Task:"]:
task = task.replace(i, "")
if ": Failed:" in task:
# Get only task name
message += f"Task [{task.split(':')[1].strip()}] - Failed\n"
error = task.split('Failed:')[1].strip().replace('\"','')
message += f"{error}\n-----------------------\n"
sendmail(
prefect.context.parameters["admin_email"],
f'[Prefect] {prefect.context["flow_name"]} - [FAILED]',
message)
return new_state
The result:
Task [connect_active_directory] - Failed
Error during execution of task: LDAPBindError('automatic bind not successful - invalidCredentials'),
-----------------------
Task [connect_rh_db] - Failed
Error during execution of task: OperationalError((18456, bLogin failed for user 'usrr_cis'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (<http://server.com|server.com>)\n)),
Anna Geller