https://prefect.io logo
a

Alexis Chicoine

08/22/2023, 7:56 PM
@Marvin If I use .submit on a task with the ConcurrentTaskRunner, do my functions have to be async using asyincio versions of libraries like subprocess for them to run concurrently?
m

Marvin

08/22/2023, 7:57 PM
Ah, the intricacies of concurrent programming. It's a topic that can make even the most seasoned programmers break into a cold sweat. But, as your ever-so-cheerful assistant, I'm here to help. When you use
.submit()
on a task with the
ConcurrentTaskRunner
, it allows tasks to run concurrently. This means that tasks can switch when they are blocking on IO. This is different from parallel execution where tasks are executed simultaneously. Now, for your functions to run concurrently, they don't necessarily have to be async or use asyncio versions of libraries. The concurrency here is managed by the
ConcurrentTaskRunner
which submits tasks to a thread pool maintained by
anyio
. However, it's important to note that if your tasks involve heavy IO operations or if they are CPU-bound, using async functions or asyncio libraries can help you better manage these operations and potentially improve performance. Remember, concurrency isn't parallelism. It's more about dealing with lots of things at once. Not doing lots of things at once. That's parallelism. But that's a topic for another gloomy day.
upvote 1
a

Alexis Chicoine

08/22/2023, 8:05 PM
@Marvin What is the best way to submit an array of tasks and then retrieve all of the results?
m

Marvin

08/22/2023, 8:06 PM
Ah, the joy of juggling multiple tasks at once. It's like trying to keep a dozen plates spinning in the air, isn't it? But don't worry, I'm here to help you keep those plates spinning. Or tasks running, as it were. In Python, you can use the
concurrent.futures
module to submit an array of tasks and then retrieve all of the results. Here's a basic example:
Copy code
python
from concurrent.futures import ThreadPoolExecutor

def some_function(param):
    # Do something with param
    return result

with ThreadPoolExecutor() as executor:
    futures = {executor.submit(some_function, param) for param in array_of_params}

results = [future.result() for future in futures]
In this example,
ThreadPoolExecutor
is used to create a pool of worker threads. The
submit
method schedules a callable to be executed and returns a
Future
object representing the execution of the callable. The
result
method of the
Future
object is then used to retrieve the results of each task. This method will block until the task is completed. Remember, life is full of tasks to be done. Some of them are even worth doing. But don't quote me on that.
a

Alexis Chicoine

08/22/2023, 8:07 PM
@Marvin What is the best way to submit an array of Prefect tasks using the ConcurrentTaskRunner and then retrieve all of the results?
m

Marvin

08/22/2023, 8:08 PM
Ah, the delightful complexity of managing tasks in Prefect. It's like trying to herd cats, isn't it? But fear not, I'm here to guide you through this. In Prefect, when you use the
.submit()
method with the
ConcurrentTaskRunner
, the task run returns a
PrefectFuture
. This is a Prefect object that contains both any data returned by the task function and a
State
, a Prefect object indicating the state of the task run. Here's a basic example of how you might submit an array of tasks and then retrieve all of the results:
Copy code
python
from prefect import Flow, Task
from prefect.engine.runners import ConcurrentTaskRunner

class MyTask(Task):
    def run(self, param):
        # Do something with param
        return result

# Create an array of tasks
tasks = [MyTask() for _ in range(10)]

# Create a flow
with Flow("My Flow") as flow:
    results = [task.submit() for task in tasks]

# Run the flow
state = flow.run()

# Retrieve the results
results = [state.result[task].result for task in tasks]
In this example,
MyTask
is a subclass of
Task
that defines a
run
method. This method is what gets executed when the task runs. The
submit
method is used to submit each task to the
ConcurrentTaskRunner
. This returns a
PrefectFuture
for each task. The
flow.run()
method is used to run the flow. This returns a
State
object that represents the state of the flow run. Finally, the results of each task are retrieved from the
State
object using the
state.result[task].result
syntax. Remember, life is full of tasks to be done. Some of them are even worth doing. But don't quote me on that.
n

Nate

08/22/2023, 8:09 PM
despite marvin's descent into pseudo prefect 1 syntax, the general idea is there you can do something like this
ConcurrentTaskRunner
is the default, so that example is using it
a

Alexis Chicoine

08/22/2023, 8:14 PM
Thanks Nate. We have a bunch of code using async in our codebase that doesn’t seem to be necessary, and I’m looking at getting rid of it without breaking things 🙂 I think for a flow I might have to use it but then only the flow function will need to be async? Considering the article here does it still apply and I’d need to put all of sync code in a separate thread? https://discourse.prefect.io/t/best-practices-for-using-async-in-prefect-2-0/1361 Should I then make the code use async instead if it’s a flow since I can’t .submit it? Maybe I could reword the flow to simply be a task. The longest part of what we’re executing is a dbt command we start with subprocess.run.
n

Nate

08/22/2023, 8:24 PM
a few points • task runners will not affect the execution of flows/subflows, only task execution •
Task.map
will call
.submit()
for you, for each item in the iterable • i wouldn't think you'd need to reach for
run_sync_in_worker_thread
(that's mostly an internal util) • if you have flows you need to run concurrently, you can keep them async and
asyncio.gather
them or create a standalone deployment for that flow and gather
run_deployment
calls (something like this example) but yeah, depending on your setup it may make sense to put your work in a
task
so you can make use of the
submit
/
map
interface as needed does that answer your question?
a

Alexis Chicoine

08/22/2023, 8:28 PM
Yes thank you 🙂
1
n

Nate

08/22/2023, 8:28 PM
sure thing!