m

    matta

    1 year ago
    Ooh, just found that the
    toolz
    library works pretty seamlessly with Prefect maps. So you can use
    pipe
    ,
    compose
    ,
    thread_first
    or
    thread_last
    to chain operations. https://toolz.readthedocs.io/en/latest/api.html#toolz.functoolz.pipe So like, from the Horizontal Mapping blog post, this:
    with Flow("mapping-test") as flow:
      sleep.map(sleep.map(times))
    cam become this:
    with Flow("mapping-test") as flow:
        tz.pipe(times, 
                sleep.map, 
                sleep.map)
    which I find a bit more readable! Or lets you turn:
    with Flow('iterated map') as flow:
        mapped_result = add_ten.map([1, 2, 3])
        mapped_result_2 = add_ten.map(mapped_result)
    into
    with Flow('asdasds') as flow:
        mapped_result = tz.pipe([1, 2, 3],
                               add_ten.map,
                               add_ten.map)
    thread_last
    and
    thread_first
    would also be useful for mapping functions with arguments! https://toolz.readthedocs.io/en/latest/api.html#toolz.functoolz.thread_last example:
    @task
    def add(x, y):
        return x + y
    
    with Flow("unmapped inputs") as flow:
        result = tz.thread_first(
            [1, 2, 3],
            (add.map, unmapped(10)),
            (add.map, unmapped(15)),
        )
    or this!
    @task
    def add(x, y):
        return x + y
    
    @task
    def mult(x, y):
        return x * y
    
    with Flow("unmapped inputs") as flow:
    
        transformations = [
            (add, 10),
            (mult, 5),
            (add, 10),
            (mult, 5),
        ]
    
        fns = [
            (fn.map, unmapped(arg))
            for fn, arg in transformations
        ]
    
        result = tz.thread_first([1, 2, 3], *fns)
    or even!
    @task
    def mult_3(x, y, z):
        return x * y * z
    
    with Flow("unmapped inputs") as flow:
        transformations = [
            (add, 10),
            (mult, 5),
            (add, 10),
            (mult, 5),
            (mult_3, 4, 11)
        ]
        fns = [
            (fn.map, *map(unmapped, args))
            for fn, *args in transformations
        ]
        result = tz.thread_first([1, 2, 3], *fns)
    if you wanted to be REALLY lazy (which I generally do!), could even omit typing
    map
    yourself by unpacking a list comprehension:
    with Flow("asdasds") as flow:
        mapped_result = tz.pipe(
            [1, 2, 3], *[x.map for x in [add_ten, add_ten]]
        )
    for a slightly less trivial example, this one from the docs https://docs.prefect.io/core/concepts/mapping.html#flat-mapping so this:
    with Flow('flat map') as f:
        a = A() # [1, 2, 3]
        b = B.map(x=a) # [[0], [0, 1], [0, 1, 2]]
        c = C.map(y=flatten(b)) # [100, 100, 101, 100, 101, 102]
    can become this:
    with Flow('flat map') as f:
        a = A() # [1, 2, 3]
        b = tz.pipe(a,
                   B.map,
                   flatten,
                   C.map)
    but yeah. Method Chaining rocks imho. https://tomaugspurger.github.io/method-chaining.html