rilshok
12/22/2021, 8:56 AM@task
def get_paths() -> List[Path]:
...
@task
def dosmth(path: Path) -> Any:
...
@task
def finally(path: Path, smth: Any):
...
with Flow('fucking-prefect') as flow:
paths = get_paths()
smth = dosmth.map(paths) # maybe there are exceptions
# TODO: need to synchronize paths and smth
finally.map(paths, smth)
dusty stickI implemented my filter like prefect.tasks.control_flow.FilterTask
from typing import List, Any, Tuple, Union
from prefect import Task
from prefect.triggers import all_finished
class CrossSkip(Task):
def __init__(self, *skip, **kwargs) -> None:
kwargs.setdefault("skip_on_upstream_skip", False)
kwargs.setdefault("trigger", all_finished)
self._types = tuple([s for s in skip if isinstance(s, type)])
self._values = [s for s in skip if not isinstance(s, type)]
if not skip:
self._types = (type(None), )
super().__init__(**kwargs)
def _filter(self, value) -> bool:
return not isinstance(value, self._types) and not any([value == v for v in self._values])
def run(self, *task_results: List[Any]) -> Union[List[Any], Tuple[List[Any], ...]]:
"""Task run method."""
assert task_results
assert len({*map(len, task_results)}) == 1
if len(task_results) == 1:
return [r for r in task_results[0] if self._filter(r)]
return tuple([*map(list, zip(*[
r for r in zip(*task_results)
if all([self._filter(v) for v in r])
]))])
The flow should have turned into something like this, and everything would have worked fine
with Flow('best-prefect-flow') as flow:
paths = get_paths()
smth = dosmth.map(paths) # maybe there are exceptions
# >>>
paths, smth = CrossSkip(Exception, None)(paths, smth)
# <<<
finally.map(paths, smth)
BUT
ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
In general, I know how to use python magic to solve this problem, but I refuse to conjure further 🙂
TLDR: Prefect's tasks can't unpack arguments:
@task
def todosmth(*arg) -> Any:
...
Anna Geller