To get to know the community better, I'll use a sm...
# ask-community
r
To get to know the community better, I'll use a small example to tell you what kind of jungle a newbie can get into. The prefect has a native way of filtering result items, but it only works at the level of one list. piggy Like a fool, I figured I just needed a super flexible filter to keep in sync as many argument lists as I want. So, let's say we have a goal: to process the list of arguments of one task, depending on how the other task handled them.
Copy code
@task
def get_paths() -> List[Path]:
    ...

@task
def dosmth(path: Path) -> Any:
    ...

@task
def finally(path: Path, smth: Any):
    ...

with Flow('fucking-prefect') as flow:
    paths = get_paths()
    smth = dosmth.map(paths) # maybe there are exceptions
    # TODO: need to synchronize paths and smth
    finally.map(paths, smth)
dusty stickI implemented my filter like
prefect.tasks.control_flow.FilterTask
Copy code
from typing import List, Any, Tuple, Union

from prefect import Task
from prefect.triggers import all_finished


class CrossSkip(Task):
    def __init__(self, *skip, **kwargs) -> None:
        kwargs.setdefault("skip_on_upstream_skip", False)
        kwargs.setdefault("trigger", all_finished)
        self._types = tuple([s for s in skip if isinstance(s, type)])
        self._values = [s for s in skip if not isinstance(s, type)]
        if not skip:
            self._types = (type(None), )
        super().__init__(**kwargs)

    def _filter(self, value) -> bool:
        return not isinstance(value, self._types) and not any([value == v for v in self._values])

    def run(self, *task_results: List[Any]) -> Union[List[Any], Tuple[List[Any], ...]]:
        """Task run method."""
        assert task_results
        assert len({*map(len, task_results)}) == 1
        if len(task_results) == 1:
            return [r for r in task_results[0] if self._filter(r)]
        return tuple([*map(list, zip(*[
            r for r in zip(*task_results)
            if all([self._filter(v) for v in r])
        ]))])
The flow should have turned into something like this, and everything would have worked fine
Copy code
with Flow('best-prefect-flow') as flow:
    paths = get_paths()
    smth = dosmth.map(paths) # maybe there are exceptions
    # >>>
    paths, smth = CrossSkip(Exception, None)(paths, smth)
    # <<<
    finally.map(paths, smth)
BUT
ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
In general, I know how to use python magic to solve this problem, but I refuse to conjure further 🙂 TLDR: Prefect's tasks can't unpack arguments:
Copy code
@task
def todosmth(*arg) -> Any:
    ...
a
@rilshok thanks for sharing. It looks like you know how to solve it? LMK if you have any specific question I can help with
👌 1