https://prefect.io logo
Title
m

matta

03/26/2021, 2:44 AM
Ooh, just found that the
toolz
library works pretty seamlessly with Prefect maps. So you can use
pipe
,
compose
,
thread_first
or
thread_last
to chain operations. https://toolz.readthedocs.io/en/latest/api.html#toolz.functoolz.pipe So like, from the Horizontal Mapping blog post, this:
with Flow("mapping-test") as flow:
  sleep.map(sleep.map(times))
cam become this:
with Flow("mapping-test") as flow:
    tz.pipe(times, 
            sleep.map, 
            sleep.map)
which I find a bit more readable! Or lets you turn:
with Flow('iterated map') as flow:
    mapped_result = add_ten.map([1, 2, 3])
    mapped_result_2 = add_ten.map(mapped_result)
into
with Flow('asdasds') as flow:
    mapped_result = tz.pipe([1, 2, 3],
                           add_ten.map,
                           add_ten.map)
🙌🏿 1
👍🏿 1
👍 4
👀 10
🙌 5
thread_last
and
thread_first
would also be useful for mapping functions with arguments! https://toolz.readthedocs.io/en/latest/api.html#toolz.functoolz.thread_last example:
@task
def add(x, y):
    return x + y

with Flow("unmapped inputs") as flow:
    result = tz.thread_first(
        [1, 2, 3],
        (add.map, unmapped(10)),
        (add.map, unmapped(15)),
    )
or this!
@task
def add(x, y):
    return x + y

@task
def mult(x, y):
    return x * y

with Flow("unmapped inputs") as flow:

    transformations = [
        (add, 10),
        (mult, 5),
        (add, 10),
        (mult, 5),
    ]

    fns = [
        (fn.map, unmapped(arg))
        for fn, arg in transformations
    ]

    result = tz.thread_first([1, 2, 3], *fns)
or even!
@task
def mult_3(x, y, z):
    return x * y * z

with Flow("unmapped inputs") as flow:
    transformations = [
        (add, 10),
        (mult, 5),
        (add, 10),
        (mult, 5),
        (mult_3, 4, 11)
    ]
    fns = [
        (fn.map, *map(unmapped, args))
        for fn, *args in transformations
    ]
    result = tz.thread_first([1, 2, 3], *fns)
if you wanted to be REALLY lazy (which I generally do!), could even omit typing
map
yourself by unpacking a list comprehension:
with Flow("asdasds") as flow:
    mapped_result = tz.pipe(
        [1, 2, 3], *[x.map for x in [add_ten, add_ten]]
    )
for a slightly less trivial example, this one from the docs https://docs.prefect.io/core/concepts/mapping.html#flat-mapping so this:
with Flow('flat map') as f:
    a = A() # [1, 2, 3]
    b = B.map(x=a) # [[0], [0, 1], [0, 1, 2]]
    c = C.map(y=flatten(b)) # [100, 100, 101, 100, 101, 102]
can become this:
with Flow('flat map') as f:
    a = A() # [1, 2, 3]
    b = tz.pipe(a,
               B.map,
               flatten,
               C.map)
but yeah. Method Chaining rocks imho. https://tomaugspurger.github.io/method-chaining.html