Sahil Chopra
10/29/2021, 5:25 PMmonitor_scrape
task that is monitoring the status of a long running job. At the completion of this task, I want to shoot out an email that provides stats outputted from that task. I’m passing the output of the monitor_scrape
task into my email task as the message; but the email seems to be firing independently (see diagram). Any pointers on what I might be doing wrong?
Flow Code:
completion_email_task = EmailTask(email_to='xxx', email_to_cc='xxx')
@task()
def monitor_scraper_run_status() -> Output:
# task code here
def create_flow(flow_name: str) -> Flow:
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
return flow
Kevin Kho
output
is a that during build like and ___repr___
is evaluated during buildtime when that output is not populated yet. You will likely need an intermediate task to pull out that repr
to pass it to the msg
so that execution will be deferredSahil Chopra
10/29/2021, 5:29 PMAnna Geller
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email = completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output)
or (provided that this repr works as you expect), pass it as state dependency:
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email = completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
output.set_downstream(completion_email)
Anna Geller
Kevin Kho
with Flow(...) as flow:
a = task_a()
b = task_b(a)
This is constructing the DAG but not running it. This is why when you register, the flow is not ran. The graph is constructed and serialized.
When you run the Flow, these tasks are actually evaluated. So it’s the task that provides this runtime deferral. Stuff not in tasks are evaluated immediately.
In the code snippet below, notice test
is evaluated immediately but task_a
is not when you print them. task_a
will not print 1
. This is because it’s deferred.
@task
def task_a():
return 1
with Flow("param_test") as flow:
test = [1,2,3,4]
print(test)
a = task_a()
print(a)
flow.register("bristech")
Anna Geller
Sahil Chopra
10/29/2021, 6:03 PMSahil Chopra
10/29/2021, 6:03 PM