https://prefect.io logo
Title
s

Stephen Lloyd

03/17/2023, 10:48 AM
In prefect 1.0 I could map() a task and then downstream I could process the list that resulted from that mapping of tasks. In Prefect 2, I can approximate the parallel mapping of tasks using
.submit()
, but I cannot find a way to reduce the results into a format that can be used by a non-mapped/non-async task. I need to find the equivalent in Prefect 2, preferrably using ConcurrentTaskRunner rather than asyncio. It seems like it would be a common use case and I'm not sure why it is not available in Prefect 2.
👀 1
e

Emil Christensen

03/20/2023, 9:56 PM
Hey @Stephen Lloyd 👋 Prefect 2 also has a
.map()
function which is essentially doing
.submit()
over a list of inputs. In either case what you get back is a future. These can be passed to other tasks which will subsequently wait for and resolve them, or you can manually wait for / resolve them.
import time
from prefect import flow, task

@task
def my_task(i: int):
    time.sleep(1)
    return True

@flow
def my_flow(log_prints=True):
    # This will run concurrently
    futures = my_task.map(range(10))

    # This will wait for completion
    print("Waiting for tasks to finish")
    [f.wait() for f in futures]
    print("Tasks are finished")

    # This will get the results
    print(f"Results: {[f.result() for f in futures]}")

my_flow()
s

Stephen Lloyd

03/21/2023, 1:05 AM
Ok, thanks. On our end, something about the docs led us to believe that only
.submit()
could leverage the concurrent task runner So, in this case, the main difference from Prefect 1 is that we just need to gather the list ourselves, correct?
e

Emil Christensen

03/21/2023, 2:28 PM
That’s right… though if you pass the futures into another task, that task should wait for them, essentially gathering them.