Eric
12/07/2023, 9:07 PM@task
def random_task(arg: int):
... do something here
return arg ^ 2
for num in [1, 2, 3, 4, 5]:
random_task.submit(arg=num)
How would i get the returned value of the task to aggregate after all the tasks are finishde?Marvin
12/07/2023, 9:08 PMsubmit
to run tasks asynchronously. This will return a AwaitableState
object.
2. Collect these AwaitableState
objects.
3. Use await
to retrieve the result of each task once it has finished.
4. Aggregate the results.
Here's an example of how you might write the flow:
python
from prefect import flow, task
from prefect.utilities.asyncutils import run_sync
@task
def random_task(arg: int):
# ... do something here ...
return arg ^ 2
@flow
def my_flow():
state_results = []
for num in [1, 2, 3, 4, 5]:
state_results.append(random_task.submit(arg=num))
# Retrieve results asynchronously
results = run_sync(asyncio.gather, *(state.result(fetch=True) for state in state_results))
# Now you have all the results
# ... aggregate them as needed ...
return results
# To run the flow
results = my_flow()
In this code:
- random_task.submit(arg=num)
asynchronously submits the task for each number in the list.
- state_results
is a list of AwaitableState
objects returned by submit
.
- run_sync
runs an async function synchronously. asyncio.gather
is used to await all the task results.
- state.result(fetch=True)
fetches the result of the task once it is completed.
Please note that this example synthesizes the approach based on the information from the Prefect documentation. If you need more detailed information or if you want information about handling more complex scenarios involving task dependencies and result handling, I would refer to the Prefect documentation directly or check the Prefect community and forums for specific use cases and examples.Nate
12/07/2023, 9:13 PMresults = [future.result() for future in random_task.map(range(3))]
Eric
12/07/2023, 9:14 PMfor num in [1, 2, 3, 4, 5]:
state_results.append(random_task.submit(arg=num))
finished_tasks = []
for len(state_results) > 0:
for running_task in state_results:
if running_task.get_state().is_completed():
state_results.remove(running_task)
finished_tasks.append(running_task)
< add other checks for failures and whatnot >
Marvin
12/07/2023, 9:15 PMpython
from prefect import get_run_logger
# Assuming random_task is a task already defined elsewhere
state_results = []
for num in [1, 2, 3, 4, 5]:
state_results.append(random_task.submit(arg=num))
finished_tasks = {}
logger = get_run_logger()
while len(state_results) > 0:
for running_task in state_results.copy(): # Use copy to avoid modifying the list while iterating
state = running_task.get_state()
if state.is_completed():
state_results.remove(running_task)
try:
result = state.result() # Retrieve the result if the task is completed
finished_tasks[running_task] = result
<http://logger.info|logger.info>(f"Task {running_task} completed with result: {result}")
except Exception as e:
logger.error(f"Failed to retrieve result for {running_task}: {str(e)}")
# Add other checks for failures and manage them accordingly
# At this point, finished_tasks dictionary holds your completed tasks and their results.
Here:
- Inside the while loop, we make a copy of state_results
to avoid editing the list while it's being iterated over.
- The state.result()
method is used to fetch the result of the completed task.
- The results are stored in a dictionary called finished_tasks
, where the keys are the task objects and the values are the task results.
- Logging is in place to provide information on the completed tasks and to log any errors that may occur while fetching the results.
This code will maintain a list of tasks that have been submitted and periodically check their state. When a task is found to be completed, its result is retrieved and stored, and then it is removed from the list of running tasks. The loop continues until all tasks have been checked and removed from the state_results
list.Eric
12/07/2023, 9:17 PM