Thread
#prefect-community
    domagoj jercic

    domagoj jercic

    1 year ago
    Hi everyone, I have a common set of concerns (configuration, logging) I want to setup for each task and my first idea was to override the .run() method of the prefect.tasks.core.function.FunctionTask and use prefect @task decorator with the new FunctionTask, but this method is not being called in the runtime when executing a task? Is it possible to add some cross cutting concern code before running a task itself? Tried a decorator, but seems it is not pickable by cloudpickle. I'll add a code example in the comment
    from functools import wraps
    from typing import Callable, Any, Union
    
    import prefect
    
    class FunctionTask(prefect.tasks.core.function.FunctionTask):
    
        def run(self) -> None:
            # here put the cross cutting concerns code
            return super(FunctionTask, self).run()
    
        def __call__(self, *args, **kwargs):
            # here put the cross cutting concerns code
            return super(FunctionTask, self).__call__(*args, **kwargs)
    
    
    def task(
        fn: Callable = None, **task_init_kwargs: Any
    ) -> Union[
        "prefect.tasks.core.function.FunctionTask",
        Callable[[Callable], "prefect.tasks.core.function.FunctionTask"],
    
    ]:
        if fn is None:
            return lambda fn: FunctionTask(
                fn=fn,
                **task_init_kwargs,
            )
        return FunctionTask(fn=fn, **task_init_kwargs)
    and then in the flow use the newly created task decorator
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @domagoj jercic, maybe this message will show you how to make a decorator
    domagoj jercic

    domagoj jercic

    1 year ago
    Hi Kevin, helps but still doesn't work when I set flow executor to dask executor
    flow.executor = LocalDaskExecutor(scheduler="multiprocessing")
    Is there a way to make the decorator pickable?
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok I’ll have to try more stuff on my end. I’ll get back to you later today probably
    domagoj jercic

    domagoj jercic

    1 year ago
    Thank you. I'm UTC+1 so I'll probably respond tomorrow 😄
    Hi @Kevin Kho, did you and the team manage to figure out something? Tried to update dill.py library in hopes that the version has resolved issues like this (https://github.com/uqfoundation/dill/issues/383). But no luck I'm afraid
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey I didn’t get a chance yesterday. Will try today
    domagoj jercic

    domagoj jercic

    1 year ago
    thank you. Let me know if I can help somehow
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey it seems to work for me with this setup:
    import traceback
    from prefect import task, Flow
    from functools import partial, wraps
    import prefect
    from prefect.executors import LocalDaskExecutor
    
    def custom_task(func=None, **task_init_kwargs):
        if func is None:
            return partial(custom_task, **task_init_kwargs)
    
        @wraps(func)
        def safe_func(**kwargs):
            try:
                return func(**kwargs)
            except Exception as e:
                print(f"Full Traceback: {traceback.format_exc()}")
                raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace
    
        safe_func.__name__ = func.__name__
        return task(safe_func, **task_init_kwargs)
    
    @custom_task
    def abc(x):
        return x
    
    with Flow("custom-decorator-test") as flow:
        abc.map([1,2,3,4,5])
    
    flow.executor = LocalDaskExecutor(scheduler="processes")
    The decorator gets mapped successfully over the processes. Are you trying something different from what I have?
    domagoj jercic

    domagoj jercic

    1 year ago
    Hm, seems like it. The same code works when I put it as a standalone file. Does that mean that part of the flow is not pickable?
    Even funnier, now I extracted all my code to the file that I created to test the custom_task and it seems to work...
    Found the difference, I've imported prefect
    after
    my code, when I changed the ordering everything worked.