Thomas Fredriksen
09/10/2021, 8:02 AMFlow
class so that all tasks will report state to an external service. From what I undersand, subclassing the Flow
class may not be a good idea as the prefect server will not use the subclass when managing the flows.
I like that prefect has support for state change hooks (state_handlers
), and I was wondering if I can use such a hook to modify the tasks of all tasks of a flow in order to add tasks state handlers that will report the task state change.
Something like this:
def task_state_handler(task, old_state, new_state):
report_state(task, old_state, new_state)
def flow_state_handler(flow, old_state, new_state):
was_pending = old_state.is_queued() or old_state.is_scheduled() or old_state.is_pending() or old_state.is_submitted()
if was_pending and new_state.is_running():
for task in flow.tasks:
task.state_handlers.append(task_state_handler)
Kevin Kho
09/10/2021, 1:56 PMfrom functools import partial, wraps
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
Thomas Fredriksen
09/10/2021, 4:51 PMKevin Kho
09/10/2021, 4:55 PMThomas Fredriksen
09/10/2021, 5:54 PMKevin Kho
09/10/2021, 6:01 PM