Sahil Chopra
10/29/2021, 5:25 PMmonitor_scrape task that is monitoring the status of a long running job. At the completion of this task, I want to shoot out an email that provides stats outputted from that task. I’m passing the output of the monitor_scrape task into my email task as the message; but the email seems to be firing independently (see diagram). Any pointers on what I might be doing wrong?
Flow Code:
completion_email_task = EmailTask(email_to='xxx', email_to_cc='xxx')
@task()
def monitor_scraper_run_status() -> Output:
# task code here
def create_flow(flow_name: str) -> Flow:
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
return flowKevin Kho
output is a that during build like and ___repr___ is evaluated during buildtime when that output is not populated yet. You will likely need an intermediate task to pull out that repr to pass it to the msg so that execution will be deferredSahil Chopra
10/29/2021, 5:29 PMAnna Geller
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email = completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output)
or (provided that this repr works as you expect), pass it as state dependency:
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email = completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
output.set_downstream(completion_email)Anna Geller
Kevin Kho
with Flow(...) as flow:
a = task_a()
b = task_b(a)
This is constructing the DAG but not running it. This is why when you register, the flow is not ran. The graph is constructed and serialized.
When you run the Flow, these tasks are actually evaluated. So it’s the task that provides this runtime deferral. Stuff not in tasks are evaluated immediately.
In the code snippet below, notice test is evaluated immediately but task_a is not when you print them. task_a will not print 1. This is because it’s deferred.
@task
def task_a():
return 1
with Flow("param_test") as flow:
test = [1,2,3,4]
print(test)
a = task_a()
print(a)
flow.register("bristech")Anna Geller
Sahil Chopra
10/29/2021, 6:03 PMSahil Chopra
10/29/2021, 6:03 PM