Skip Breidbach
07/28/2020, 9:28 PMcleanup()
method to detect if the tasks in the context succeeded or failed? I see the comment A Task is automatically added to call the cleanup method (closing the Dask cluster) after all tasks under the context have completed.
in the documentation, but can't see a way to access that created Task
.
(I have a situation in which it would be convenient to execute only part of my cleanup
task when something in that context fails.)Jim Crist-Harif
07/28/2020, 9:31 PMresource_manager
decorator using cleanup_task_kwargs={"trigger": your_trigger}
.Skip Breidbach
07/28/2020, 9:43 PMcleanup_task_kwargs
but it seemed one-or-the other (I know my case is a bit odd)cleanup
I'd like to know if those context tasks all succeeded or (if one or more) failed.Jim Crist-Harif
07/28/2020, 9:45 PMimport prefect
from prefect import Flow, task, resource_manager
from prefect.engine import signals
from prefect.tasks.core.resource_manager import resource_cleanup_trigger
def my_fancy_trigger(upstream_states):
if resource_cleanup_trigger(upstream_states):
prefect.context.update(upstream_states=upstream_states)
return True
return False
@resource_manager(
cleanup_task_kwargs={"trigger": my_fancy_trigger}
)
class MyResource:
def setup(self):
return 1
def cleanup(self, resource):
print(prefect.context.upstream_states)
@task
def inc(x):
return x + 1
@task
def fail():
raise ValueError
with Flow("example") as flow:
with MyResource() as x:
y = inc(x)
inc(y)
fail()
flow.run()
Skip Breidbach
07/28/2020, 10:03 PMJim Crist-Harif
07/28/2020, 10:05 PM