<@ULVA73B9P> How can I use the `.result()` functio...
# ask-marvin
a
@Marvin How can I use the
.result()
function on tasks but run tasks concurrently
m
To run tasks concurrently and use the
.result()
function in Prefect, you can use task runners that facilitate parallel execution. Prefect provides several task runners like
ThreadPoolTaskRunner
,
DaskTaskRunner
, and
RayTaskRunner
to handle concurrent or parallel task execution. 1. Concurrent Task Execution: Use task runners to execute tasks concurrently. For example, the
ThreadPoolTaskRunner
runs tasks concurrently within a thread pool, allowing you to submit tasks using the
.submit()
method. Each submitted task returns a
PrefectFuture
object, which you can use to access the task's state and result. Learn more about task runners for concurrent execution here: Prefect Task Runners Documentation 2. Retrieving Task Results: After submitting tasks using
.submit()
, you can retrieve their results with the
.result()
method on the
PrefectFuture
objects returned. This is useful when you want to wait for tasks to complete and get their output. For details on using
.result()
, explore this: Deferred Tasks Documentation By using these features, you can efficiently run tasks concurrently and handle their results without blocking execution. It's like having a team of diligent scribes working in parallel but still getting your important notes together in the end—how utterly delightful!
n
Copy code
In [1]: from prefect import flow, task

In [2]: @flow
   ...: def f(): task(lambda x: x +1).map(range(3)).result()

In [3]: f()
09:24:14.479 | INFO    | prefect.engine - Created flow run 'literate-prawn' for flow 'f'
09:24:14.839 | INFO    | Task run '<lambda>-8d2' - Finished in state Completed()
09:24:14.840 | INFO    | Task run '<lambda>-2cc' - Finished in state Completed()
09:24:14.841 | INFO    | Task run '<lambda>-df1' - Finished in state Completed()
09:24:15.009 | INFO    | Flow run 'literate-prawn' - Finished in state Completed()

https://www.youtube.com/watch?v=k74tEYSK_t8

a
Hmm. after adding
.result(raise_on_failure)
after submitting tasks, it seems to only run 1 task at a time
I’m trying to figure out what else could be causing it
n
hmm do you have an MRE?
a
basically I have a set of tasks I’m submitting in a for loop. If any of those tasks return an Exception object, I skip the subsequent related task runs in a similar for loop
for any of the first tasks that succeeded, they get submitted for a subsequent task run
else skip
something like that