Hi all! I have a state handler for alerting me whe...
# ask-community
d
Hi all! I have a state handler for alerting me when a
Task
is retrying. It tells me which
Task
failed, when it’ll retry, how many times this task has retried, etc. I think I’d really rather just have that same state handler on the top-level
Flow
to avoid setting that state handler on each task. I noticed the signatures for
Task
state handlers are different than those for the
Flow
state handlers — is there an easy way to have a
Flow
state handler that can access Task-level information? For example, given a
Flow
that’s changing states, can we readily access the specific
Task
in question?
k
Hey @Danny Vilela, have you considered subclassing
@task
instead to add your state handler to your own decorator
@mytask
and then it will have the state handler attached? I think this is easier than using the Flow state handler, and I’m not even sure that would work because it’s when Flows change state rather than the task level
💡 1
I have a sample posted from community before
Copy code
import traceback
from prefect import task, Flow
from functools import partial, wraps
import prefect
from prefect.executors import LocalDaskExecutor

def custom_task(func=None, **task_init_kwargs):
    if func is None:
        return partial(custom_task, **task_init_kwargs)

    @wraps(func)
    def safe_func(**kwargs):
        try:
            return func(**kwargs)
        except Exception as e:
            print(f"Full Traceback: {traceback.format_exc()}")
            raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace

    safe_func.__name__ = func.__name__
    return task(safe_func, **task_init_kwargs)

@custom_task
def abc(x):
    return x

with Flow("custom-decorator-test") as flow:
    abc.map([1,2,3,4,5])
d
Ah, that’s fair! I hadn’t thought of wrapping the
FunctionTask
, no. Also yeah, totally forgot that the
Flow
changing state is different from the individual
Task
changing state. I’m guessing you mean something like:
Copy code
retry_task = lambda func, **kwargs: task(func, state_handlers=[notify_on_retry], **kwargs)

@retry_task
def do_the_thing(...) -> ...:
    ...
Oh gotcha! That certainly looks more robust 😁 I’ll definitely think about that — thanks @Kevin Kho!
k
I think your lambda might work, but you may need logic to override the state handler if you want