Krzysztof Nawara
10/14/2020, 8:05 AMJenny
10/14/2020, 11:20 AMKrzysztof Nawara
10/14/2020, 11:44 AM@curry
def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
def wrapped_fn(*args, **kwargs):
return fn(*args, **kwargs)
wrapped_fn.__name__ = fn.__name__
t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
return t
But this fails Prefect signature validation:
Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/crossvalidation.py", line 10, in <module>
def generate_cv_splits(data, k=7, rep_count=1):
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/toolz/functoolz.py", line 303, in __call__
return self._partial(*args, **kwargs)
File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/flow.py", line 23, in wrapped_task
t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/tasks/core/function.py", line 60, in __init__
prefect.core.task._validate_run_signature(fn) # type: ignore
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 52, in _validate_run_signature
raise ValueError(
ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
So I tried without *args, but then it fails on _bind_:
Traceback (most recent call last):
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/flow.py", line 342, in _flow_context
yield self
File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/flow.py", line 81, in flow_builder
task_configurator(mode=mode)
File "/Users/knawara/fun/analytics/titanic_pipeline/analysis_flow.py", line 213, in configure_tasks
prepared_data = wrap_task(prepare_data, **kwargs)(raw_data)
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
new.bind(
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 511, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/inspect.py", line 3025, in bind
return self._bind(args, kwargs)
File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/inspect.py", line 2951, in _bind
raise TypeError(
TypeError: too many positional arguments
Jenny
10/14/2020, 12:46 PMKrzysztof Nawara
10/14/2020, 5:52 PMimport prefect
from toolz import curry
from typing import Callable, Any
@curry
def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
def wrapped_fn(**kwargs):
print("wrapped")
return fn(**kwargs)
wrapped_fn.__name__ = fn.__name__
t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
return t
from prefect import Flow
@wrapped_task
def t1():
print("executing t1")
return 10
@wrapped_task
def t2():
print("executing t2")
return 20
with Flow("tf") as tf:
t1()
t2()
s = tf.run()
import prefect
from toolz import curry
from typing import Callable, Any
@curry
def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
def wrapped_fn(*args, **kwargs):
print("wrapped")
return fn(*args, **kwargs)
wrapped_fn.__name__ = fn.__name__
t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
return t
from prefect import Flow
@wrapped_task
def t1():
print("executing t1")
return 10
@wrapped_task
def t2(t1):
print("executing t2")
return t1*2
with Flow("tf") as tf:
t2(t1())
s = tf.run()
Chris White
*args
). Only a fixed set of positional arguments or keyword arguments are supported. Your wrapped_fn
signature contains *args
and this is where the error is originating fromKrzysztof Nawara
10/14/2020, 6:09 PMChris White
wrapped_fn
unfortunately; right now your wrapped function only takes keyword arguments, but your task accepts both a single positional argument as well as a keyword argument. There are ways of updating the signature of the wrapped_fn
to match that of the provided fn
, but you’ll have to learn about __signature__
and inspect
.
Assuming you can document this for yourself and collaborators, a workaround is to always use keyword arguments when binding tasks:
with Flow("tf") as tf:
t2(t1=t1())
Krzysztof Nawara
10/14/2020, 6:18 PM@functools.wraps(fn)
def wrapped_fn(*args, **kwargs):
print(shared_storage)
return fn(*args, **kwargs)
Chris White