Title
m

Mahesh

06/29/2021, 3:38 PM
Hello Team, I am running prefect flow of tasks in for loop with task_run_names, Similar to below example.
@task
def get_values():
    return ["value", "test", "demo"]

@task(task_run_name="{val}", state_handlers=[post_to_mail])
def compute(val):
    if val == "demo":
        raise ValueError("Nope!")

with Flow("task_run_names") as flow:
    vals = get_values()
    compute.map(vals)
I want to enble statehandlers for the tasks to notify the failed tasks with task_run_names, Since it is looping tasks, am getting task name but not task_run_name. Is there any prefect.context to get task_run_name or any way to get task_run_name in statehandlers.
k

Kevin Kho

06/29/2021, 3:41 PM
Hi @Mahesh, I can look into it. Do you have minimal code for the state_handler I can start with?
m

Mahesh

06/29/2021, 3:44 PM
from prefect import task, Flow
from prefect import Flow, task
from prefect.engine.state import Failed
from prefect.client.secrets import Secret
import smtplib, ssl
import prefect

port = 587  # For starttls
smtp_server = "<http://smtp.gmail.com|smtp.gmail.com>"
sender_email=Secret('EMAIL_USERNAME')
sender_email=sender_email.get()

receiver_email=Secret('receiver_email')
receiver_email=receiver_email.get()

password=Secret('EMAIL_PASSWORD')
password=password.get()

def post_to_mail(task, old_state, new_state):
    if new_state.is_failed():
        flow_run_id = prefect.context.get("flow_run_id")
        flow_id = prefect.context.get("flow_id")
        msg = "Task {0} failed and is retrying at {1}""".format(task, new_state)
        message = "Subject: {0}\n\n{1}".format(flow_id, msg)
        context = ssl.create_default_context()
        with smtplib.SMTP(smtp_server, port) as server:
            server.starttls(context=context)
            server.login(sender_email, password)
            for i in receiver_email:
                server.sendmail(sender_email, i, message)

    if new_state.is_successful():
        flow_run_id = prefect.context.get("flow_run_id")
        flow_id = prefect.context.get("flow_id")
        msg = "Task {0} failed and is retrying at {1}""".format(task, new_state)
        message = "Subject: {0}\n\n{1}".format(flow_id, msg)
        context = ssl.create_default_context()
        with smtplib.SMTP(smtp_server, port) as server:
            server.starttls(context=context)
            server.login(sender_email, password)
            for i in receiver_email:
                server.sendmail(sender_email, i, message)
k

Kevin Kho

06/29/2021, 3:44 PM
Thanks!
👍 1
@Marvin open “Add task_run_name to context”
k

Kevin Kho

06/29/2021, 4:12 PM
You would need to use the
task_run_id
and query the GraphQL API to get the
task_run_name
. We’ll work on getting task_run_name in the context though
k

kumar

06/30/2021, 12:33 AM
@Kevin Kho can you share the example code to query graphql API.
k

Kevin Kho

06/30/2021, 12:36 AM
Get task_run_id from context and then
query = """
query {task_run_by_pk (id: "5144f2c0-a29c-4182-a06f-4402341aebda") {
  name
}}
"""
client.graphql(query)
k

kumar

06/30/2021, 12:39 AM
Thanks
👍 1