Hi, I need all of the tasks in the flow to have so...
# prefect-community
k
Hi, I need all of the tasks in the flow to have some shared configuration, so I decided to follow into Prefect example and create a custom decorator. But I have problem with correctly forwarding the arguments to the task. EDIT: details (code and stackstrackes) moved to the thread Any suggestions how to make it work?
j
Hi @Krzysztof Nawara - it's hard to know but looks like there's too many arguments somewhere. Can you share the code you using when you take out *args? And could you also please move the long error messages into this thread so we can then more easily and also so they don't block up the main thread. Thanks!
k
If I understand it correct since there is no *args in task siguanture, even 1 positional argument is too much. And when I build my flow, I'm using positional arguments. I'll try to build minimal example and post it here.
Details: Code  I started with:
Copy code
@curry
def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
    def wrapped_fn(*args, **kwargs):
        return fn(*args, **kwargs)
    wrapped_fn.__name__ = fn.__name__

    t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
    return t
But this fails Prefect signature validation:
Copy code
Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 783, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/crossvalidation.py", line 10, in <module>
    def generate_cv_splits(data, k=7, rep_count=1):
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/toolz/functoolz.py", line 303, in __call__
    return self._partial(*args, **kwargs)
  File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/flow.py", line 23, in wrapped_task
    t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/tasks/core/function.py", line 60, in __init__
    prefect.core.task._validate_run_signature(fn)  # type: ignore
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 52, in _validate_run_signature
    raise ValueError(
ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
So I tried without *args, but then it fails on _bind_:
Copy code
Traceback (most recent call last):
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/flow.py", line 342, in _flow_context
    yield self
  File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/flow.py", line 81, in flow_builder
    task_configurator(mode=mode)
  File "/Users/knawara/fun/analytics/titanic_pipeline/analysis_flow.py", line 213, in configure_tasks
    prepared_data = wrap_task(prepare_data, **kwargs)(raw_data)
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
    new.bind(
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 511, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/inspect.py", line 3025, in bind
    return self._bind(args, kwargs)
  File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/inspect.py", line 2951, in _bind
    raise TypeError(
TypeError: too many positional arguments
j
Thanks @Krzysztof Nawara - if you do a reproducible example, you might want to post in our Github discussions board - it's easier for us to track issues there!
k
Okay, I've got the MRE: First, the case of 2 independent tasks works perfectly:
Copy code
import prefect
from toolz import curry
from typing import Callable, Any

@curry
def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
    def wrapped_fn(**kwargs):
        print("wrapped")
        return fn(**kwargs)
    wrapped_fn.__name__ = fn.__name__

    t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
    return t

from prefect import Flow

@wrapped_task
def t1():
    print("executing t1")
    return 10

@wrapped_task
def t2():
    print("executing t2")
    return 20

with Flow("tf") as tf:
    t1()
    t2()

s = tf.run()
Now, when I introduce dependencies, I get the abovementioned errors:
Copy code
import prefect
from toolz import curry
from typing import Callable, Any

@curry
def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
    def wrapped_fn(*args, **kwargs):
        print("wrapped")
        return fn(*args, **kwargs)
    wrapped_fn.__name__ = fn.__name__

    t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
    return t

from prefect import Flow

@wrapped_task
def t1():
    print("executing t1")
    return 10

@wrapped_task
def t2(t1):
    print("executing t2")
    return t1*2

with Flow("tf") as tf:
    t2(t1())

s = tf.run()
c
Hi Krzysztof and thanks for the code example - as the error states, Prefect does not support arbitrary positional arguments (usually denoted
*args
). Only a fixed set of positional arguments or keyword arguments are supported. Your
wrapped_fn
signature contains
*args
and this is where the error is originating from
k
That's correct. But if I remove *args (and leave only **kwargs), I get 2nd error (too many positional arguments). What signature a generic wrapper should have?
c
you’re going to have to do some metaprogramming on the signature of your
wrapped_fn
unfortunately; right now your wrapped function only takes keyword arguments, but your task accepts both a single positional argument as well as a keyword argument. There are ways of updating the signature of the
wrapped_fn
to match that of the provided
fn
, but you’ll have to learn about
__signature__
and
inspect
. Assuming you can document this for yourself and collaborators, a workaround is to always use keyword arguments when binding tasks:
Copy code
with Flow("tf") as tf:
    t2(t1=t1())
k
That's what I was afraid of, I'll have to look at it. Thanks 🙂
👍 1
Apparently Python is more than happy to do the heavy lifting for me: https://stackoverflow.com/a/147878/12640922
Copy code
@functools.wraps(fn)
    def wrapped_fn(*args, **kwargs):
        print(shared_storage)
        return fn(*args, **kwargs)
upvote 1
c
Ahhh good call!