Oscar Krantz
06/17/2022, 1:33 PMAnna Geller
06/17/2022, 1:36 PMany_successful
, any_failed
Oscar Krantz
06/17/2022, 1:40 PMKevin Kho
06/17/2022, 1:43 PMOscar Krantz
06/17/2022, 1:45 PMKevin Kho
06/17/2022, 1:59 PMOscar Krantz
06/17/2022, 2:11 PMdef state_handler(...):
if task.failed:
new = other_task(...)
return new.state
else:
continue on
Kevin Kho
06/17/2022, 2:29 PMtask.run()
but it’s just normal Python. It’s not a task with retriesOscar Krantz
06/17/2022, 2:47 PMKevin Kho
06/17/2022, 2:52 PMfrom prefect import task, Flow
import prefect
from prefect.triggers import any_failed, any_successful
@task
def task_one(x):
if x == False:
raise ValueError
return x
@task(trigger=any_successful)
def task_one_success():
<http://prefect.context.logger.info|prefect.context.logger.info>("TASK ONE SUCCESS")
return 1
@task(trigger=any_failed)
def task_one_failed():
<http://prefect.context.logger.info|prefect.context.logger.info>("TASK ONE FAILED")
return 2
@task
def task_two(x):
if x == False:
raise ValueError
return x
@task(trigger=any_successful)
def task_two_success():
<http://prefect.context.logger.info|prefect.context.logger.info>("TASK TWO SUCCESS")
return 1
@task(trigger=any_failed)
def task_two_failed():
<http://prefect.context.logger.info|prefect.context.logger.info>("TASK TWO FAILED")
return 2
@task(trigger=any_successful)
def merge():
return True
@task()
def final_task():
<http://prefect.context.logger.info|prefect.context.logger.info>("FINAL TASK")
return 1
with Flow("..") as flow:
a = task_one(False)
a_s = task_one_success(upstream_tasks=[a])
a_f = task_one_failed(upstream_tasks=[a])
b = task_two(True)
b_s = task_two_success(upstream_tasks=[b])
b_f = task_two_failed(upstream_tasks=[b])
a_res = merge(upstream_tasks=[a_s, a_f])
b_res = merge(upstream_tasks=[b_s, b_f])
final_task(upstream_tasks=[a_res, b_res])
flow.run()
Oscar Krantz
06/17/2022, 3:59 PMcase
as well. Thanks :)