https://prefect.io logo
Title
s

Stephen Lloyd

02/28/2023, 5:46 AM
Suppose I have a mapped task that returns a tuple... I want to further map out each element in the tuple's list to a downstream task. How do I do this? And how do I gather the results back such that I can associate them with the original tuple?
@task
def som_task():
  ...
  return name, list

@task
def mapped_process():
  ...
  return ??

@flow
def my_flow():
  names = ['test1', 'test2']
  x = some_task.map(names)
  # x = [('test1', [1,2,3,4,5]), ('test2', [6,7,8,9,10])]

  lots_of_data = mapped_process.map()
1
j

jpuris

02/28/2023, 8:14 AM
What I'd try is 1. enumerate the list 2. loop through the elements passing the params + enum (your ID) and run the task function 3. (optional) switch from default sequential runner to concurrent task runner in
my_flow
4. Task needs to also return the enum (ID) as part of the return 5. Save the task run returns into a list 6. zip up the list of tuples by the returned ID That said, this is whatever comes to my mind on the spot. Probably can be done in much more pythonic way or maybe Prefect has a mechanism for this 🤷
r

Ryan Peden

02/28/2023, 3:02 PM
Would using the output of the first mapped task as the input to the second mapped task do what you need? Something like this:
from prefect import task, flow, get_run_logger

@task
def some_task(name: str):
    # read the int from the last character of the name
    current = int(name[-1])
    start = (current - 1) * 5
    nums = [i for i in range(start, start + 5)]
    return name, nums


@task
def mapped_process(input: tuple[str, list[int]]):
    name, data = input
    # sum of data
    data_sum = sum(data)
    return f"sum for {name} is {data_sum}"


@flow
def my_flow():
    names = ['test1', 'test2', 'test3', 'test4', 'test5']
    x = some_task.map(names)

    lots_of_data = mapped_process.map(x)
    results = [d.result() for d in lots_of_data]
    tuples = [t.result() for t in x]

    # zip the results with the original tuples
    output = list(zip(tuples, results))

    logger = get_run_logger()
    for o in output:
        <http://logger.info|logger.info>(o)



if __name__ == "__main__":
    my_flow()
This results in output like:
10:00:07.185 | INFO    | Flow run 'zippy-chihuahua' - (('test1', [0, 1, 2, 3, 4]), 'sum for test1 is 10')
10:00:07.186 | INFO    | Flow run 'zippy-chihuahua' - (('test2', [5, 6, 7, 8, 9]), 'sum for test2 is 35')
10:00:07.186 | INFO    | Flow run 'zippy-chihuahua' - (('test3', [10, 11, 12, 13, 14]), 'sum for test3 is 60')
10:00:07.186 | INFO    | Flow run 'zippy-chihuahua' - (('test4', [15, 16, 17, 18, 19]), 'sum for test4 is 85')
10:00:07.186 | INFO    | Flow run 'zippy-chihuahua' - (('test5', [20, 21, 22, 23, 24]), 'sum for test5 is 110')
s

Stephen Lloyd

03/01/2023, 8:52 AM
Thanks. This is close, and it is what I currently do, but I actually would like to run
mapped_process()
for each element in
x[1]
. I can do:
@task
def mapped_process(val: str, num: int):
   square = num * num
   resturn val, square

@flow
def_my_flow()
### stuff
lots_of_data = mapped_process.map(unmapped(x[0]), x[1])
but I was unsure of how to tie the results back together correctly. Are the order of returned results in
x
and
lots_of_data
guaranteed?
j

jpuris

03/01/2023, 8:53 AM
Are the order of returned results in
x
and
lots_of_data
guaranteed?
With sequential runner, I'd say yes. With dask, concurrent etc runner - I don't think so
s

Stephen Lloyd

03/01/2023, 9:01 AM
ok, thanks. I am planning to run this concurrently. x is a collection of ids generated from an api call. Each id then requires it's own api call. I need to cut down the overall time required by mapping and running these calls concurrently.
I should have included that. I think I'll have to use the id as a key to tie the results together.
j

jpuris

03/01/2023, 9:05 AM
If the ID is idempotent and unique, then 👍, you should be able to use that as the PK between the request and response
s

Stephen Lloyd

03/01/2023, 9:13 AM
eh...that's a good point.
My problem is that I've got multiple laysers of mapping going on. I'm not sure I can guarantee a unique id, but I could concatenate some values together to synthesize one.
Thank you both for your help!