Hi everyone. I am trying to wrap the `Flow` class ...
# prefect-server
t
Hi everyone. I am trying to wrap the
Flow
class so that all tasks will report state to an external service. From what I undersand, subclassing the
Flow
class may not be a good idea as the prefect server will not use the subclass when managing the flows. I like that prefect has support for state change hooks (
state_handlers
), and I was wondering if I can use such a hook to modify the tasks of all tasks of a flow in order to add tasks state handlers that will report the task state change. Something like this:
Copy code
def task_state_handler(task, old_state, new_state):
  report_state(task, old_state, new_state)

def flow_state_handler(flow, old_state, new_state):
  was_pending = old_state.is_queued() or old_state.is_scheduled() or old_state.is_pending() or old_state.is_submitted()
  if was_pending and new_state.is_running():
    for task in flow.tasks:
      task.state_handlers.append(task_state_handler)
k
Hey @Thomas Fredriksen, I think this approach seems good. Some people just subclass task and add the state_handlers to all of their tasks. I have a sample here if it helps you get started.
Copy code
from functools import partial, wraps
def custom_task(func=None, **task_init_kwargs):
    if func is None:
        return partial(custom_task, **task_init_kwargs)

    @wraps(func)
    def safe_func(**kwargs):
        try:
            return func(**kwargs)
        except Exception as e:
            print(f"Full Traceback: {traceback.format_exc()}")
            raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace

    safe_func.__name__ = func.__name__
    return task(safe_func, **task_init_kwargs)
t
This works for custom tasks, but I would not get the same functionality with the built-in tasks such as bigquery. I am looking for a general approach so that all tasks are wrapped with said state handler.
k
Ah I don’t think there is a global way to attach it to all tasks. Maybe at the end of your script, you can loop through the tasks in the Flow object and attach it to each one?
t
that's what I was thinking. The state handler seems like an elegant way of achieving this
k
I just understood what you are saying. I’m not 100% sure this will work but that’s a great idea!