domagoj jercic
07/13/2021, 2:50 PMdomagoj jercic
07/13/2021, 2:51 PMfrom functools import wraps
from typing import Callable, Any, Union
import prefect
class FunctionTask(prefect.tasks.core.function.FunctionTask):
def run(self) -> None:
# here put the cross cutting concerns code
return super(FunctionTask, self).run()
def __call__(self, *args, **kwargs):
# here put the cross cutting concerns code
return super(FunctionTask, self).__call__(*args, **kwargs)
def task(
fn: Callable = None, **task_init_kwargs: Any
) -> Union[
"prefect.tasks.core.function.FunctionTask",
Callable[[Callable], "prefect.tasks.core.function.FunctionTask"],
]:
if fn is None:
return lambda fn: FunctionTask(
fn=fn,
**task_init_kwargs,
)
return FunctionTask(fn=fn, **task_init_kwargs)
domagoj jercic
07/13/2021, 2:53 PMKevin Kho
domagoj jercic
07/13/2021, 4:04 PMflow.executor = LocalDaskExecutor(scheduler="multiprocessing")
Is there a way to make the decorator pickable?Kevin Kho
domagoj jercic
07/13/2021, 4:11 PMdomagoj jercic
07/14/2021, 1:28 PMKevin Kho
domagoj jercic
07/14/2021, 1:43 PMKevin Kho
import traceback
from prefect import task, Flow
from functools import partial, wraps
import prefect
from prefect.executors import LocalDaskExecutor
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
return x
with Flow("custom-decorator-test") as flow:
abc.map([1,2,3,4,5])
flow.executor = LocalDaskExecutor(scheduler="processes")
Kevin Kho
domagoj jercic
07/14/2021, 3:06 PMdomagoj jercic
07/14/2021, 3:09 PMdomagoj jercic
07/14/2021, 3:21 PMafter
my code, when I changed the ordering everything worked.