Stephen Lloyd
03/17/2023, 10:48 AM.submit()
, but I cannot find a way to reduce the results into a format that can be used by a non-mapped/non-async task.
I need to find the equivalent in Prefect 2, preferrably using ConcurrentTaskRunner rather than asyncio. It seems like it would be a common use case and I'm not sure why it is not available in Prefect 2.Emil Christensen
03/20/2023, 9:56 PM.map()
function which is essentially doing .submit()
over a list of inputs. In either case what you get back is a future. These can be passed to other tasks which will subsequently wait for and resolve them, or you can manually wait for / resolve them.
import time
from prefect import flow, task
@task
def my_task(i: int):
time.sleep(1)
return True
@flow
def my_flow(log_prints=True):
# This will run concurrently
futures = my_task.map(range(10))
# This will wait for completion
print("Waiting for tasks to finish")
[f.wait() for f in futures]
print("Tasks are finished")
# This will get the results
print(f"Results: {[f.result() for f in futures]}")
my_flow()
Stephen Lloyd
03/21/2023, 1:05 AM.submit()
could leverage the concurrent task runner So, in this case, the main difference from Prefect 1 is that we just need to gather the list ourselves, correct?Emil Christensen
03/21/2023, 2:28 PM