Jason Motley
12/08/2021, 3:09 PMKevin Kho
Kevin Kho
Jason Motley
12/08/2021, 3:32 PMKevin Kho
Jason Motley
12/08/2021, 3:37 PMJason Motley
12/08/2021, 3:37 PMKevin Kho
Jason Motley
12/08/2021, 3:45 PMKevin Kho
wait_for_flow_run
has a raise_final_state
. Set this to True. This means that if the sub-Flow Fails, the wait
will Fail also.Kevin Kho
with Flow(...) as flow:
a = create_flow_run(...)
wait_a = wait_for_flow_run(a, raise_final_state=True)
b = create_flow_run(...)
wait_b = wait_for_flow_run(b, raise_final_state=True)
collect_all_subflows = [wait_a, wait_b, ...]
print_to_slack(collect_all_subflows)
Kevin Kho
run
method inside.Kevin Kho
always_run
triggerJason Motley
12/08/2021, 3:53 PMalways_run
, that would be within create_flow_run, right?Kevin Kho
Kevin Kho
from prefect import task, Flow
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.triggers import always_run
from prefect.tasks.notifications import SlackTask
@task
def get_values(x):
if x == 2:
raise ValueError()
return x+1
@task(trigger=always_run)
def notifier(wait_a, wait_b, wait_c):
# <http://prefect.context.logger.info|prefect.context.logger.info>(wait_a)
flows = [wait_a, wait_b, wait_c]
msg = ""
for flow in flows:
msg = msg + f"{flow.name} had a state {flow.state} \n"
<http://prefect.context.logger.info|prefect.context.logger.info>(msg)
SlackTask(message=msg).run()
return msg
with Flow("flowA") as flow_a:
get_values(1)
with Flow("flowB") as flow_b:
get_values(2)
with Flow("flowC") as flow_c:
get_values(1)
create_flow_run.trigger = always_run
wait_for_flow_run.trigger = always_run
with Flow("main") as flow:
a = create_flow_run(flow_name="flowA", project_name="bristech")
wait_a = wait_for_flow_run(a, raise_final_state=True)
b = create_flow_run(flow_name="flowB", project_name="bristech", upstream_tasks=[wait_a])
wait_b = wait_for_flow_run(b, raise_final_state=True)
c = create_flow_run(flow_name="flowC", project_name="bristech", upstream_tasks=[wait_b])
wait_c = wait_for_flow_run(c, raise_final_state=True)
notifier(wait_a, wait_b, wait_c)
flow_a.register("bristech")
flow_b.register("bristech")
flow_c.register("bristech")
# flow.register("bristech")
flow.run()
Jason Motley
12/08/2021, 4:28 PMKevin Kho
notifier
, I think you can check if something failed by doing
if wait_a.state.is_failed():
# add some different message