Separate question: has anyone mapped a prefect tas...
# ask-community
d
Separate question: has anyone mapped a prefect task over the product of iterables? e.g., if I have a task
EvaluateModelOnMetric(model: Model = …, metric: Metric = …)
that evaluates a single model against a single metric and lists
models: List[Model] = [model_a, model_b, model_c]
and
metrics: List[Metric] = [precision, recall, f1_score]
. It’s pretty simple to design a task
EvaluateModelsOnMetrics(models: List[Model] = …, metrics: List[Metric] = …)
that uses
itertools.product
internally and delegates to a
EvaluateModelOnMetric
, but I’m wondering if prefect has something like this out of the box 🙂
k
You definitely need a task to make the cross product. Wanna PR one? 😆
a
@Danny Vilela we had a similar question 4 days ago, and I think you may apply a similar principle: create a separate task that builds a product of iterables and turns those into a list. Then you can map some other task over this list of inputs - here is an example
🙌 1
d
Ah yeah! I guess there could just be a
ProductMapTask
. Thank you @Anna Geller!
Hi @Anna Geller / @Kevin Kho — sorry to revive a stale thread, but would this approach of using
itertools.product
work with task checkpointing? This example is a bit incomplete, but I think it shows what I’m trying to do:
Copy code
import itertools
from typing import Any
from typing import Dict
from typing import List

from prefect import task
from prefect import Flow
from prefect.engine.state import State


@task()
def product_map(data: Dict[str, Any]) -> List[Dict[str, Any]]:
    keys: List[str] = [*data.keys()]
    vals: List[Any] = [*data.values()]
    return [dict(zip(keys, pair)) for pair in itertools.product(*vals)]


@task()
def print_abc(a: int, b: str, c: str) -> str:
    print(a, b, c)
    return f"a={a}, b={b}, c={c}"


params: Dict[str, Any] = dict(
    a=[1, 2],
    b=["foo", "bar"],
    c=["hello", "world"]
)

with Flow(name="test") as flow:
    mapped_params = product_map(data=params)
    print_abc.map(mapped_params)  # Note: this does not work!

flow_state: State = flow.run()
assert flow_state.is_successful()
print(flow_state.result[mapped_params].result)
mapped_params
is a list of dictionaries, where each dictionary represents a product of all values of
params
values (e.g.,
[{'a': 1, 'b': 'foo', 'c': 'hello'}, {'a': 1, 'b': 'foo', 'c': 'world'}, …]
). But I’m not sure [how/if it’s possible] to tell Prefect that each dictionary should be expanded into the
print_abc
method’s keyword arguments? Am I missing something? I assume that if I can get each individual dictionary’s
a
,
b
, and
c
mapped to a task, then checkpointing could work as expected.
a
I think the issue is that you can't map over multiple keyword arguments at the same time and you would need to freeze those that shouldn't be mapped over using unmapped (from prefect import unmapped). But I agree that it's difficult to do in the current data format... I would try to somehow flatten this list of dictionaries into a more map-friendly format, e.g. a list of list? And then you could perhaps use flatten (from prefect import flatten) to flatten the nested lists for mapping Just ideas...