Austin Weisgrau
02/24/2023, 9:08 PMtask.map()
is useful for setting up a set of tasks to run in parallel.
Is it possible to set up a lot of tasks to run in parallel, with some substructure? E.G. a task structure that looks like this imageimport time
from prefect import flow, get_run_logger, task
@task
def get_values():
return ["1", "2", "3"]
def concurrent_method(value):
result_one_future = concurrent_task_one.submit(value)
result_two_future = concurrent_task_two.submit(result_one_future)
return result_two_future
@task
def concurrent_task_one(value):
get_run_logger().info(f"starting: {value}")
time.sleep(3)
return value
@task
def concurrent_task_two(value):
get_run_logger().info(f"second: {value}")
time.sleep(3)
return value
@task
def log_results(values):
get_run_logger().info(f"Results: {', '.join(values)}")
@flow
def helloworld() -> None:
values = get_values()
result_futures = []
for value in values:
result_future = concurrent_method(value)
result_futures.append(result_future)
log_results(result_futures)
if __name__ == "__main__":
helloworld()