https://prefect.io logo
Title
a

Austin Weisgrau

02/24/2023, 9:08 PM
task.map()
is useful for setting up a set of tasks to run in parallel. Is it possible to set up a lot of tasks to run in parallel, with some substructure? E.G. a task structure that looks like this image
Similarly, how can mapped tasks spawn their own mapped tasks in parallel?
🤔 The impression I'm getting from the docs is these kinds of task structures are not possible, that any dependent structure like this needs to be hidden within a single task, and if I want this kind of parallelization I need to use asyncio or concurrency
I changed my mind, I think these things are possible using the task.submit() function to cue prefect that tasks can be run concurrently. I designed a minimal working example for my first task dependency setup from the beginning of this thread:
import time

from prefect import flow, get_run_logger, task


@task
def get_values():
    return ["1", "2", "3"]


def concurrent_method(value):
    result_one_future = concurrent_task_one.submit(value)
    result_two_future = concurrent_task_two.submit(result_one_future)
    return result_two_future


@task
def concurrent_task_one(value):
    get_run_logger().info(f"starting: {value}")
    time.sleep(3)
    return value


@task
def concurrent_task_two(value):
    get_run_logger().info(f"second: {value}")
    time.sleep(3)
    return value


@task
def log_results(values):
    get_run_logger().info(f"Results: {', '.join(values)}")


@flow
def helloworld() -> None:

    values = get_values()

    result_futures = []
    for value in values:
        result_future = concurrent_method(value)
        result_futures.append(result_future)

    log_results(result_futures)


if __name__ == "__main__":
    helloworld()
I'm still working on figure out how to set up the second, branching example