Ben Fogelson
03/26/2021, 3:26 PMall_finished
but that does different things depending on whether it would have been triggered by all_successful
or some_failed
.nicholas
custom_trigger(upstream_states: Set[State]) -> bool
Ben Fogelson
03/26/2021, 3:30 PMBen Fogelson
03/26/2021, 3:32 PM@task(trigger=custom_trigger)
def my_task():
do_something_based_on_return_value_of_custom_trigger()
Ben Fogelson
03/26/2021, 3:33 PMresource_manager
decorator, if the cleanup task can do different things depending on the state of the tasks in the resource manager blocknicholas
nicholas
from prefect.triggers import all_successful, some_failed, all_finished
@task(trigger=some_failed)
def failed_cleanup():
# do something
@task(trigger=all_successful)
def successful_cleanup():
# do something
@task(trigger=all_finished)
def finish_cleanup():
# do something
with Flow("my flow") as flow:
task_1 = task1()
failed_cleanup()
successful_cleanup()
finish_cleanup()
nicholas
Ben Fogelson
03/26/2021, 3:58 PMZanie
import prefect
FAIL = True
@prefect.task()
def fail_task():
if FAIL:
raise ValueError("Hello")
# otherwise
return "Hello"
@prefect.task()
def succeed_task():
return "world"
@prefect.task(log_stdout=True, trigger=prefect.triggers.all_finished)
def message(a, b):
if isinstance(a, ValueError):
print(f"Oh no! Upstream `a` failed with {a}")
else:
print(f"{a} {b}")
with prefect.Flow("test") as flow:
a, b = fail_task(), succeed_task()
message(a, b)
flow.run()
Ben Fogelson
03/26/2021, 9:40 PMresource_manager.py
to allow my desired behavior, and have a draft PR up. I was hoping someone from the prefect team could take a quick look and tell me if this would be a welcome feature before I invest the time to write tests, update docstrings, etc. CC @Jim Crist-Harif since you wrote the resource manager.
https://github.com/PrefectHQ/prefect/pull/4305