Alexis Chicoine
08/22/2023, 7:56 PMMarvin
08/22/2023, 7:57 PM.submit()
on a task with the ConcurrentTaskRunner
, it allows tasks to run concurrently. This means that tasks can switch when they are blocking on IO. This is different from parallel execution where tasks are executed simultaneously.
Now, for your functions to run concurrently, they don't necessarily have to be async or use asyncio versions of libraries. The concurrency here is managed by the ConcurrentTaskRunner
which submits tasks to a thread pool maintained by anyio
.
However, it's important to note that if your tasks involve heavy IO operations or if they are CPU-bound, using async functions or asyncio libraries can help you better manage these operations and potentially improve performance.
Remember, concurrency isn't parallelism. It's more about dealing with lots of things at once. Not doing lots of things at once. That's parallelism. But that's a topic for another gloomy day.Alexis Chicoine
08/22/2023, 8:05 PMMarvin
08/22/2023, 8:06 PMconcurrent.futures
module to submit an array of tasks and then retrieve all of the results. Here's a basic example:
python
from concurrent.futures import ThreadPoolExecutor
def some_function(param):
# Do something with param
return result
with ThreadPoolExecutor() as executor:
futures = {executor.submit(some_function, param) for param in array_of_params}
results = [future.result() for future in futures]
In this example, ThreadPoolExecutor
is used to create a pool of worker threads. The submit
method schedules a callable to be executed and returns a Future
object representing the execution of the callable.
The result
method of the Future
object is then used to retrieve the results of each task. This method will block until the task is completed.
Remember, life is full of tasks to be done. Some of them are even worth doing. But don't quote me on that.Alexis Chicoine
08/22/2023, 8:07 PMMarvin
08/22/2023, 8:08 PM.submit()
method with the ConcurrentTaskRunner
, the task run returns a PrefectFuture
. This is a Prefect object that contains both any data returned by the task function and a State
, a Prefect object indicating the state of the task run.
Here's a basic example of how you might submit an array of tasks and then retrieve all of the results:
python
from prefect import Flow, Task
from prefect.engine.runners import ConcurrentTaskRunner
class MyTask(Task):
def run(self, param):
# Do something with param
return result
# Create an array of tasks
tasks = [MyTask() for _ in range(10)]
# Create a flow
with Flow("My Flow") as flow:
results = [task.submit() for task in tasks]
# Run the flow
state = flow.run()
# Retrieve the results
results = [state.result[task].result for task in tasks]
In this example, MyTask
is a subclass of Task
that defines a run
method. This method is what gets executed when the task runs.
The submit
method is used to submit each task to the ConcurrentTaskRunner
. This returns a PrefectFuture
for each task.
The flow.run()
method is used to run the flow. This returns a State
object that represents the state of the flow run.
Finally, the results of each task are retrieved from the State
object using the state.result[task].result
syntax.
Remember, life is full of tasks to be done. Some of them are even worth doing. But don't quote me on that.Nate
08/22/2023, 8:09 PMConcurrentTaskRunner
is the default, so that example is using itAlexis Chicoine
08/22/2023, 8:14 PMNate
08/22/2023, 8:24 PMTask.map
will call .submit()
for you, for each item in the iterable
• i wouldn't think you'd need to reach for run_sync_in_worker_thread
(that's mostly an internal util)
• if you have flows you need to run concurrently, you can keep them async and asyncio.gather
them or create a standalone deployment for that flow and gather run_deployment
calls (something like this example)
but yeah, depending on your setup it may make sense to put your work in a task
so you can make use of the submit
/ map
interface as needed
does that answer your question?Alexis Chicoine
08/22/2023, 8:28 PMNate
08/22/2023, 8:28 PM