Danny Vilela
09/21/2021, 5:51 PMTask
is retrying. It tells me which Task
failed, when it’ll retry, how many times this task has retried, etc. I think I’d really rather just have that same state handler on the top-level Flow
to avoid setting that state handler on each task. I noticed the signatures for Task
state handlers are different than those for the Flow
state handlers — is there an easy way to have a Flow
state handler that can access Task-level information?
For example, given a Flow
that’s changing states, can we readily access the specific Task
in question?Kevin Kho
@task
instead to add your state handler to your own decorator @mytask
and then it will have the state handler attached?
I think this is easier than using the Flow state handler, and I’m not even sure that would work because it’s when Flows change state rather than the task levelKevin Kho
import traceback
from prefect import task, Flow
from functools import partial, wraps
import prefect
from prefect.executors import LocalDaskExecutor
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
return x
with Flow("custom-decorator-test") as flow:
abc.map([1,2,3,4,5])
Danny Vilela
09/21/2021, 5:56 PMFunctionTask
, no. Also yeah, totally forgot that the Flow
changing state is different from the individual Task
changing state.
I’m guessing you mean something like:
retry_task = lambda func, **kwargs: task(func, state_handlers=[notify_on_retry], **kwargs)
@retry_task
def do_the_thing(...) -> ...:
...
Danny Vilela
09/21/2021, 5:57 PMKevin Kho