Mahesh
06/29/2021, 3:38 PM@task
def get_values():
return ["value", "test", "demo"]
@task(task_run_name="{val}", state_handlers=[post_to_mail])
def compute(val):
if val == "demo":
raise ValueError("Nope!")
with Flow("task_run_names") as flow:
vals = get_values()
compute.map(vals)
I want to enble statehandlers for the tasks to notify the failed tasks with task_run_names,
Since it is looping tasks, am getting task name but not task_run_name.
Is there any prefect.context to get task_run_name or any way to get task_run_name in statehandlers.Kevin Kho
06/29/2021, 3:41 PMMahesh
06/29/2021, 3:44 PMfrom prefect import task, Flow
from prefect import Flow, task
from prefect.engine.state import Failed
from prefect.client.secrets import Secret
import smtplib, ssl
import prefect
port = 587 # For starttls
smtp_server = "<http://smtp.gmail.com|smtp.gmail.com>"
sender_email=Secret('EMAIL_USERNAME')
sender_email=sender_email.get()
receiver_email=Secret('receiver_email')
receiver_email=receiver_email.get()
password=Secret('EMAIL_PASSWORD')
password=password.get()
def post_to_mail(task, old_state, new_state):
if new_state.is_failed():
flow_run_id = prefect.context.get("flow_run_id")
flow_id = prefect.context.get("flow_id")
msg = "Task {0} failed and is retrying at {1}""".format(task, new_state)
message = "Subject: {0}\n\n{1}".format(flow_id, msg)
context = ssl.create_default_context()
with smtplib.SMTP(smtp_server, port) as server:
server.starttls(context=context)
server.login(sender_email, password)
for i in receiver_email:
server.sendmail(sender_email, i, message)
if new_state.is_successful():
flow_run_id = prefect.context.get("flow_run_id")
flow_id = prefect.context.get("flow_id")
msg = "Task {0} failed and is retrying at {1}""".format(task, new_state)
message = "Subject: {0}\n\n{1}".format(flow_id, msg)
context = ssl.create_default_context()
with smtplib.SMTP(smtp_server, port) as server:
server.starttls(context=context)
server.login(sender_email, password)
for i in receiver_email:
server.sendmail(sender_email, i, message)
Kevin Kho
06/29/2021, 3:44 PMMarvin
06/29/2021, 4:10 PMKevin Kho
06/29/2021, 4:12 PMtask_run_id
and query the GraphQL API to get the task_run_name
. We’ll work on getting task_run_name in the context thoughkumar
06/30/2021, 12:33 AMKevin Kho
06/30/2021, 12:36 AMquery = """
query {task_run_by_pk (id: "5144f2c0-a29c-4182-a06f-4402341aebda") {
name
}}
"""
client.graphql(query)
kumar
06/30/2021, 12:39 AM