Danny Vilela
09/21/2021, 5:51 PMTask is retrying. It tells me which Task failed, when it’ll retry, how many times this task has retried, etc. I think I’d really rather just have that same state handler on the top-level Flow to avoid setting that state handler on each task. I noticed the signatures for Task state handlers are different than those for the Flow state handlers — is there an easy way to have a Flow state handler that can access Task-level information?
For example, given a Flow that’s changing states, can we readily access the specific Task in question?Kevin Kho
@task instead to add your state handler to your own decorator @mytask and then it will have the state handler attached?
I think this is easier than using the Flow state handler, and I’m not even sure that would work because it’s when Flows change state rather than the task levelKevin Kho
import traceback
from prefect import task, Flow
from functools import partial, wraps
import prefect
from prefect.executors import LocalDaskExecutor
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
return x
with Flow("custom-decorator-test") as flow:
abc.map([1,2,3,4,5])Danny Vilela
09/21/2021, 5:56 PMFunctionTask, no. Also yeah, totally forgot that the Flow changing state is different from the individual Task changing state.
I’m guessing you mean something like:
retry_task = lambda func, **kwargs: task(func, state_handlers=[notify_on_retry], **kwargs)
@retry_task
def do_the_thing(...) -> ...:
...Danny Vilela
09/21/2021, 5:57 PMKevin Kho