Thread
#prefect-community
    Krzysztof Nawara

    Krzysztof Nawara

    1 year ago
    Hi, I need all of the tasks in the flow to have some shared configuration, so I decided to follow into Prefect example and create a custom decorator. But I have problem with correctly forwarding the arguments to the task. EDIT: details (code and stackstrackes) moved to the thread Any suggestions how to make it work?
    Jenny

    Jenny

    1 year ago
    Hi @Krzysztof Nawara - it's hard to know but looks like there's too many arguments somewhere. Can you share the code you using when you take out *args? And could you also please move the long error messages into this thread so we can then more easily and also so they don't block up the main thread. Thanks!
    Krzysztof Nawara

    Krzysztof Nawara

    1 year ago
    If I understand it correct since there is no *args in task siguanture, even 1 positional argument is too much. And when I build my flow, I'm using positional arguments. I'll try to build minimal example and post it here.
    Details: Code  I started with:
    @curry
    def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
        def wrapped_fn(*args, **kwargs):
            return fn(*args, **kwargs)
        wrapped_fn.__name__ = fn.__name__
    
        t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
        return t
    But this fails Prefect signature validation:
    Traceback (most recent call last):
      File "<frozen importlib._bootstrap>", line 991, in _find_and_load
      File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
      File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
      File "<frozen importlib._bootstrap_external>", line 783, in exec_module
      File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
      File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/crossvalidation.py", line 10, in <module>
        def generate_cv_splits(data, k=7, rep_count=1):
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/toolz/functoolz.py", line 303, in __call__
        return self._partial(*args, **kwargs)
      File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/flow.py", line 23, in wrapped_task
        t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/tasks/core/function.py", line 60, in __init__
        prefect.core.task._validate_run_signature(fn)  # type: ignore
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 52, in _validate_run_signature
        raise ValueError(
    ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
    So I tried without *args, but then it fails on bind:
    Traceback (most recent call last):
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/contextlib.py", line 131, in __exit__
        self.gen.throw(type, value, traceback)
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/flow.py", line 342, in _flow_context
        yield self
      File "/Users/knawara/fun/analytics/mllib/v1/prefect_utils/flow.py", line 81, in flow_builder
        task_configurator(mode=mode)
      File "/Users/knawara/fun/analytics/titanic_pipeline/analysis_flow.py", line 213, in configure_tasks
        prepared_data = wrap_task(prepare_data, **kwargs)(raw_data)
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
        new.bind(
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/site-packages/prefect/core/task.py", line 511, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/inspect.py", line 3025, in bind
        return self._bind(args, kwargs)
      File "/Users/knawara/Applications/miniconda3/envs/titanic_pipeline/lib/python3.8/inspect.py", line 2951, in _bind
        raise TypeError(
    TypeError: too many positional arguments
    Jenny

    Jenny

    1 year ago
    Thanks @Krzysztof Nawara - if you do a reproducible example, you might want to post in our Github discussions board - it's easier for us to track issues there!
    Krzysztof Nawara

    Krzysztof Nawara

    1 year ago
    Okay, I've got the MRE: First, the case of 2 independent tasks works perfectly:
    import prefect
    from toolz import curry
    from typing import Callable, Any
    
    @curry
    def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
        def wrapped_fn(**kwargs):
            print("wrapped")
            return fn(**kwargs)
        wrapped_fn.__name__ = fn.__name__
    
        t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
        return t
    
    from prefect import Flow
    
    @wrapped_task
    def t1():
        print("executing t1")
        return 10
    
    @wrapped_task
    def t2():
        print("executing t2")
        return 20
    
    with Flow("tf") as tf:
        t1()
        t2()
    
    s = tf.run()
    Now, when I introduce dependencies, I get the abovementioned errors:
    import prefect
    from toolz import curry
    from typing import Callable, Any
    
    @curry
    def wrapped_task(fn: Callable, **task_init_kwargs: Any) -> "prefect.tasks.core.function.FunctionTask":
        def wrapped_fn(*args, **kwargs):
            print("wrapped")
            return fn(*args, **kwargs)
        wrapped_fn.__name__ = fn.__name__
    
        t = prefect.tasks.core.function.FunctionTask(fn=wrapped_fn, **task_init_kwargs)
        return t
    
    from prefect import Flow
    
    @wrapped_task
    def t1():
        print("executing t1")
        return 10
    
    @wrapped_task
    def t2(t1):
        print("executing t2")
        return t1*2
    
    with Flow("tf") as tf:
        t2(t1())
    
    s = tf.run()
    Chris White

    Chris White

    1 year ago
    Hi Krzysztof and thanks for the code example - as the error states, Prefect does not support arbitrary positional arguments (usually denoted
    *args
    ). Only a fixed set of positional arguments or keyword arguments are supported. Your
    wrapped_fn
    signature contains
    *args
    and this is where the error is originating from
    Krzysztof Nawara

    Krzysztof Nawara

    1 year ago
    That's correct. But if I remove *args (and leave only **kwargs), I get 2nd error (too many positional arguments). What signature a generic wrapper should have?
    Chris White

    Chris White

    1 year ago
    you’re going to have to do some metaprogramming on the signature of your
    wrapped_fn
    unfortunately; right now your wrapped function only takes keyword arguments, but your task accepts both a single positional argument as well as a keyword argument. There are ways of updating the signature of the
    wrapped_fn
    to match that of the provided
    fn
    , but you’ll have to learn about
    __signature__
    and
    inspect
    . Assuming you can document this for yourself and collaborators, a workaround is to always use keyword arguments when binding tasks:
    with Flow("tf") as tf:
        t2(t1=t1())
    Krzysztof Nawara

    Krzysztof Nawara

    1 year ago
    That's what I was afraid of, I'll have to look at it. Thanks 🙂
    Apparently Python is more than happy to do the heavy lifting for me:https://stackoverflow.com/a/147878/12640922
    @functools.wraps(fn)
        def wrapped_fn(*args, **kwargs):
            print(shared_storage)
            return fn(*args, **kwargs)
    Chris White

    Chris White

    1 year ago
    Ahhh good call!