Hi everyone, How do I use the input of a mapped t...
# ask-community
f
Hi everyone, How do I use the input of a mapped task as input for another mapped task? I have two apply map functions where the first one returns a list of lists, then in the second mapping function I want to do a mapping for each element of this list. I flattened the list but for some reason instead of using the elements, it uses the lists separately.
k
Hi @Fabrice Toussaint, it should work. Could you show me your code?
f
I got a "fake" example:
Copy code
@task
def offset_for_number_of_pages(pages: int = 1):
    if pages < 1:
        raise ValueError('Cannot be lower than 1!')
    else:
        return [100 * n for n in range(pages)]

@task
def get_items(offset, url):
    """Retrieves the orders response of the API request."""
    request = requests.get(url, offset=offset)
    return request.json

@task
def do_something(item):
    return item + 1

def execute_for_item(item):
    return do_something(item)
    
with Flow as flow:
    pages = Parameter('pages', default=1)
    offsets = offset_for_number_of_pages(pages=pages) #[0, 100, 200, 300, 500]
    items = get_items.map(offset=offsets, url=unmapped(url)) #[[0..100], [100..200], [200..300], [300..400]
    results = apply_map(execute_for_item, item=flatten(items)) #[0..400]
in the "execute_for_item" I call multiple tasks but I did not add all of them in this fake example
Copy code
from prefect import task, Flow, apply_map, flatten, unmapped, Parameter


@task
def generate_list(amount: int = 1):
    if amount < 1:
        raise ValueError('Cannot be lower than 1!')
    else:
        return [100 * n for n in range(amount)]


@task
def get_items(item):
    """Retrieves the orders response of the API request."""
    return [0, 1, 2, 3, item]


@task
def add_one(item):
    print(item)
    return item + 1


@task
def add_two(item):
    return item + 2


def execute_for_item(item):
    item_plus_one = add_one(item)
    return add_two(item_plus_one)


with Flow("Test") as flow:
    amount = Parameter('amount', default=1)
    l = generate_list(amount=amount)  # [0, 100, 200, 300, 500]
    items = get_items.map(item=l)  # [[0..100], [100..200], [200..300], [300..400]
    results = apply_map(execute_for_item, item=flatten(items))  # [0..400]

flow.run()
this example actually works
you see that the apply_map doesnt work for execute_for_item, as add_one prints a list instead of the items from items
it uses the list and not the elements of the list
k
I see. This may take me a bit to figure out. I'll respond to you tomorrow since today is a US holiday
f
Thank you! Enjoy your day off tomorrow :)
Today I mean!
👍 1
@Kevin Kho any updates 😄? I hope you had a nice Memorial Day yesterday btw!
k
Just in looking at it now
I need to do some tests on my end
You’re right that the flatten does not work in apply_map. I don’t know why. Will ask the team but seems like you’ll need an intermediate task to flatten things like this:
Copy code
from prefect import Flow, task, flatten, case, apply_map
import prefect

@task
def generate_numbers():
    return [[1],[2,3],[4,5,6]]

@task
def up(x):
    return x + 1

@task
def is_even(x):
    return x % 2 ==0

def up_or_down(x):
    cond = is_even(x)
    with case(cond, True):
        res = up(x)
    return res

@task
def simple_flatten(x):
    return [num for elem in x for num in elem]

with Flow('test') as flow:
     x = generate_numbers()
     y = simple_flatten(x)
     apply_map(up_or_down, y)

flow.run()
f
that works, shouldve thought about it myself :')
thanks a lot again, a bit weird that flatten doesnt work with apply_map
k
no problem! I think it has to do with the fact that apply map works on a function and not a task, but i’m not super sure yet