Hi, what is the recommended way of adding a handle for flow run interruption in Prefect 1.0 (similarly to
on_handle
argument of
Flow
)?
✅ 1
1️⃣ 1
a
Anna Geller
12/08/2022, 11:58 AM
is this a test? 😅 afaik, there's no on_handle, do you mean on_failure?
if so, anytime you call a task or a subflow (without task runner), you can pass retiurn_state=True to get state and then you can act on it e.g. if state.name == "Failed" (or "Completed") and triggering downstream actions just with if/else
from prefect import flow, task
import random
@task
def on_failure_task():
print("Taking action on failure here")
@task
def on_success_task():
print("Taking action on success here")
@flow
def bad_subflow():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
@flow(log_prints=True, name="Conditional subflows")
def main():
state = bad_subflow(return_state=True)
if state.name == "Failed":
print("Subflow failed! Run on_failure_task")
on_failure_task()
else:
print("Subflow succeeded! Run on_success_task")
on_success_task()
if __name__ == "__main__":
main()
o
Olivér Atanaszov
12/08/2022, 12:09 PM
ah ok, this is Prefect 2.0
a
Anna Geller
12/08/2022, 12:10 PM
yes, I thought you made a typo there because the question suggested you know how to solve it in v1 but not how to migrate this to v2
Anna Geller
12/08/2022, 12:10 PM
generally, migration is highly recommended and encouraged
o
Olivér Atanaszov
12/08/2022, 12:12 PM
thanks Anna
a
Anna Geller
12/08/2022, 12:15 PM
Overall in V1, there are flow level state handlers to solve the same problem, there are a couple of examples on Discourse for V1 if you check the tag state handlers
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.