Hi everyone, I have a common set of concerns (conf...
# ask-community
d
Hi everyone, I have a common set of concerns (configuration, logging) I want to setup for each task and my first idea was to override the .run() method of the prefect.tasks.core.function.FunctionTask and use prefect @task decorator with the new FunctionTask, but this method is not being called in the runtime when executing a task? Is it possible to add some cross cutting concern code before running a task itself? Tried a decorator, but seems it is not pickable by cloudpickle. I'll add a code example in the comment
Copy code
from functools import wraps
from typing import Callable, Any, Union

import prefect

class FunctionTask(prefect.tasks.core.function.FunctionTask):

    def run(self) -> None:
        # here put the cross cutting concerns code
        return super(FunctionTask, self).run()

    def __call__(self, *args, **kwargs):
        # here put the cross cutting concerns code
        return super(FunctionTask, self).__call__(*args, **kwargs)


def task(
    fn: Callable = None, **task_init_kwargs: Any
) -> Union[
    "prefect.tasks.core.function.FunctionTask",
    Callable[[Callable], "prefect.tasks.core.function.FunctionTask"],

]:
    if fn is None:
        return lambda fn: FunctionTask(
            fn=fn,
            **task_init_kwargs,
        )
    return FunctionTask(fn=fn, **task_init_kwargs)
and then in the flow use the newly created task decorator
k
Hey @domagoj jercic, maybe this message will show you how to make a decorator
d
Hi Kevin, helps but still doesn't work when I set flow executor to dask executor
Copy code
flow.executor = LocalDaskExecutor(scheduler="multiprocessing")
Is there a way to make the decorator pickable?
k
Ok I’ll have to try more stuff on my end. I’ll get back to you later today probably
d
Thank you. I'm UTC+1 so I'll probably respond tomorrow 😄
Hi @Kevin Kho, did you and the team manage to figure out something? Tried to update dill.py library in hopes that the version has resolved issues like this (https://github.com/uqfoundation/dill/issues/383). But no luck I'm afraid
k
Hey I didn’t get a chance yesterday. Will try today
d
thank you. Let me know if I can help somehow
k
Hey it seems to work for me with this setup:
Copy code
import traceback
from prefect import task, Flow
from functools import partial, wraps
import prefect
from prefect.executors import LocalDaskExecutor

def custom_task(func=None, **task_init_kwargs):
    if func is None:
        return partial(custom_task, **task_init_kwargs)

    @wraps(func)
    def safe_func(**kwargs):
        try:
            return func(**kwargs)
        except Exception as e:
            print(f"Full Traceback: {traceback.format_exc()}")
            raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace

    safe_func.__name__ = func.__name__
    return task(safe_func, **task_init_kwargs)

@custom_task
def abc(x):
    return x

with Flow("custom-decorator-test") as flow:
    abc.map([1,2,3,4,5])

flow.executor = LocalDaskExecutor(scheduler="processes")
The decorator gets mapped successfully over the processes. Are you trying something different from what I have?
d
Hm, seems like it. The same code works when I put it as a standalone file. Does that mean that part of the flow is not pickable?
Even funnier, now I extracted all my code to the file that I created to test the custom_task and it seems to work...
Found the difference, I've imported prefect
after
my code, when I changed the ordering everything worked.