<@ULVA73B9P> I want to submit tasks async from my ...
# ask-marvin
e
@Marvin I want to submit tasks async from my flow, for example
Copy code
@task
def random_task(arg: int):
   ... do something here
   return arg ^ 2

for num in [1, 2, 3, 4, 5]:
   random_task.submit(arg=num)
How would i get the returned value of the task to aggregate after all the tasks are finishde?
m
To submit tasks asynchronously in Prefect 2.x and then to aggregate their results after all of them have finished, you can do the following: 1. Use
submit
to run tasks asynchronously. This will return a
AwaitableState
object. 2. Collect these
AwaitableState
objects. 3. Use
await
to retrieve the result of each task once it has finished. 4. Aggregate the results. Here's an example of how you might write the flow:
Copy code
python
from prefect import flow, task
from prefect.utilities.asyncutils import run_sync

@task
def random_task(arg: int):
    # ... do something here ...
    return arg ^ 2

@flow
def my_flow():
    state_results = []
    for num in [1, 2, 3, 4, 5]:
        state_results.append(random_task.submit(arg=num))
        
    # Retrieve results asynchronously
    results = run_sync(asyncio.gather, *(state.result(fetch=True) for state in state_results))

    # Now you have all the results
    # ... aggregate them as needed ...
    return results

# To run the flow
results = my_flow()
In this code: -
random_task.submit(arg=num)
asynchronously submits the task for each number in the list. -
state_results
is a list of
AwaitableState
objects returned by
submit
. -
run_sync
runs an async function synchronously.
asyncio.gather
is used to await all the task results. -
state.result(fetch=True)
fetches the result of the task once it is completed. Please note that this example synthesizes the approach based on the information from the Prefect documentation. If you need more detailed information or if you want information about handling more complex scenarios involving task dependencies and result handling, I would refer to the Prefect documentation directly or check the Prefect community and forums for specific use cases and examples.
n
i think marvin is overcomplicating this, would you just want
Copy code
results = [future.result() for future in random_task.map(range(3))]
e
@Marvin I'm maintaining a list of the task submits and checking for their state before moving on, like below. How can I retrieve the result values from the finished tasks?
Copy code
for num in [1, 2, 3, 4, 5]:
 state_results.append(random_task.submit(arg=num))

finished_tasks = []

for len(state_results) > 0:
  for running_task in state_results:
    if running_task.get_state().is_completed():
       state_results.remove(running_task)
       finished_tasks.append(running_task)
     < add other checks for failures and whatnot >
m
To retrieve the result values from finished tasks in Prefect 2.x, you should check the state of each task to determine if it has been completed and then retrieve the result. Based on the provided code snippet, here is an updated version that includes the retrieval of task results:
Copy code
python
from prefect import get_run_logger

# Assuming random_task is a task already defined elsewhere
state_results = []
for num in [1, 2, 3, 4, 5]:
    state_results.append(random_task.submit(arg=num))

finished_tasks = {}
logger = get_run_logger()

while len(state_results) > 0:
    for running_task in state_results.copy(): # Use copy to avoid modifying the list while iterating
        state = running_task.get_state()
        if state.is_completed():
            state_results.remove(running_task)
            try:
                result = state.result()  # Retrieve the result if the task is completed
                finished_tasks[running_task] = result
                <http://logger.info|logger.info>(f"Task {running_task} completed with result: {result}")
            except Exception as e:
                logger.error(f"Failed to retrieve result for {running_task}: {str(e)}")
            # Add other checks for failures and manage them accordingly

# At this point, finished_tasks dictionary holds your completed tasks and their results.
Here: - Inside the while loop, we make a copy of
state_results
to avoid editing the list while it's being iterated over. - The
state.result()
method is used to fetch the result of the completed task. - The results are stored in a dictionary called
finished_tasks
, where the keys are the task objects and the values are the task results. - Logging is in place to provide information on the completed tasks and to log any errors that may occur while fetching the results. This code will maintain a list of tasks that have been submitted and periodically check their state. When a task is found to be completed, its result is retrieved and stored, and then it is removed from the list of running tasks. The loop continues until all tasks have been checked and removed from the
state_results
list.
e
This was what i needed! thanks