Are there any examples of how to use Email Task wi...
# ask-community
s
Are there any examples of how to use Email Task with inputs taken from another task? I have a
monitor_scrape
task that is monitoring the status of a long running job. At the completion of this task, I want to shoot out an email that provides stats outputted from that task. I’m passing the output of the
monitor_scrape
task into my email task as the message; but the email seems to be firing independently (see diagram). Any pointers on what I might be doing wrong? Flow Code:
Copy code
completion_email_task = EmailTask(email_to='xxx', email_to_cc='xxx')
Copy code
@task()
def monitor_scraper_run_status() -> Output:
    # task code here
Copy code
def create_flow(flow_name: str) -> Flow:
    with Flow(flow_name,
              state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
                              ]) as flow:
        output = monitor_scraper_run_status()
        completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
    return flow
k
Hey @Sahil Chopra, I think the issue here is that
output
is a that during build like and
___repr___
is evaluated during buildtime when that output is not populated yet. You will likely need an intermediate task to pull out that
repr
to pass it to the
msg
so that execution will be deferred
s
Conceptually - why does putting this in another task, defer execution?
a
I think you need to either pass the “output” as data dependency without modifying the dunder repr:
Copy code
with Flow(flow_name,
          state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
                          ]) as flow:
    output = monitor_scraper_run_status()
    completion_email = completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output)
or (provided that this repr works as you expect), pass it as state dependency:
Copy code
with Flow(flow_name,
          state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
                          ]) as flow:
    output = monitor_scraper_run_status()
    completion_email = completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
    output.set_downstream(completion_email)
Kevin is right - putting it into another task should work as well. @Sahil Chopra You asked why: execution of all tasks is deferred until runtime, i.e. until a FlowRun is created for this Flow.
1
k
So tasks in general defer execution to runtime. When you do
Copy code
with Flow(...) as flow:
     a = task_a()
     b = task_b(a)
This is constructing the DAG but not running it. This is why when you register, the flow is not ran. The graph is constructed and serialized. When you run the Flow, these tasks are actually evaluated. So it’s the task that provides this runtime deferral. Stuff not in tasks are evaluated immediately. In the code snippet below, notice
test
is evaluated immediately but
task_a
is not when you print them.
task_a
will not print
1
. This is because it’s deferred.
Copy code
@task
def task_a():
    return 1
    
with Flow("param_test") as flow:
    test = [1,2,3,4]
    print(test)
    a = task_a()
    print(a)

flow.register("bristech")
upvote 1
a
@Sahil Chopra does this function “monitor_scraper_run_status” return anything? if so, I think you don’t need this repr at all, your return value of the function will be passed as data dependency to the email task automatically
s
Yep, it returns DataClasss object! I was using the .repr__ function, since downstream libs were having issues encoding the object. Instead I’ll just return string repr from the “monitor_scrape_run_status” task so that the data dependency is linked.
👍 1
Thank you both for the clear explanation! Super helpful 🙂