<@ULVA73B9P> give an example of efficient high par...
# ask-marvin
b
@Marvin give an example of efficient high paralel flow in prefect 3.0
m
To create an efficient high-parallel flow in Prefect 3.0, you can leverage task runners like
DaskTaskRunner
or
RayTaskRunner
, which enable parallel execution of tasks. Here's a concise example using the `DaskTaskRunner`:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def compute(x):
    return x * x

@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4}))
def parallel_flow():
    results = [compute.submit(i) for i in range(10)]
    return [r.result() for r in results]

if __name__ == "__main__":
    print(parallel_flow())
### Key Points: - Task Runners: Use
DaskTaskRunner
for parallel execution. You can configure the number of workers to control parallelism. - Concurrency vs. Parallelism: Understand that concurrency allows tasks to run without blocking each other, while parallelism allows tasks to run simultaneously. - Futures: Use
.submit()
to run tasks and handle their results with
.result()
. For more details, you can refer to the Prefect documentation on task runners. Also, remember that in Prefect 3.x,
.map
and
.submit
are synchronous operations, and you should resolve futures explicitly if needed. If you're interested in more advanced configurations or have specific constraints, feel free to ask!
b
@Marvin using default runner
m
If you want to execute a high-parallel flow using Prefect 3.0's default task runner, you can use the
ThreadPoolTaskRunner
, which allows concurrent execution within a thread pool. Here's how you can set it up:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def compute(x):
    return x * x

@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def parallel_flow():
    results = [compute.submit(i) for i in range(10)]
    return [r.result() for r in results]

if __name__ == "__main__":
    print(parallel_flow())
### Explanation: - ThreadPoolTaskRunner: This is the default task runner in Prefect 3.0, which allows tasks to run concurrently using threads. You can specify the
max_workers
parameter to control the number of concurrent threads. - Task Submission: Use
compute.submit(i)
to run tasks concurrently. - Result Handling: Collect results using
.result()
to ensure all tasks complete and return their results. This setup should efficiently run tasks in parallel using the default thread pool. For more information, you can check out the Prefect documentation on task runners.
b
@Marvin is this better than .map in prefect 2.0?
m
In Prefect 3.0, using
.submit()
with task runners like
ThreadPoolTaskRunner
or
DaskTaskRunner
offers more explicit control over task execution compared to
.map()
in Prefect 2.0. Here's a comparison: ### Prefect 3.0 Advantages: 1. Explicit Control:
.submit()
provides more explicit control over the execution and handling of task results, making it easier to manage parallelism and concurrency. 2. Task Runners: Prefect 3.0 introduces flexible task runners that allow you to choose between thread-based or process-based parallelism, or even distributed execution with Dask or Ray. 3. Synchronous
.map()
: In Prefect 3.0,
.map()
is always synchronous, which means tasks are submitted immediately, but their execution might not be parallel unless explicitly configured with a task runner. ### Prefect 2.0 `.map()`: - Implicit Parallelism:
.map()
in Prefect 2.0 is more implicit and might not offer the same level of control over task execution as Prefect 3.0's task runners. - Ease of Use: While convenient for simple cases, it might not be as flexible for complex parallel execution scenarios. Overall, Prefect 3.0's approach with task runners and
.submit()
can be considered better for scenarios requiring explicit and flexible control over task execution. However, the choice depends on your specific use case and requirements. For more details, you can refer to the Prefect documentation on task runners.
n
hi @Bruno Nirello .map is just shorthand for .submit + list comprehension

https://www.youtube.com/watch?v=k74tEYSK_t8&amp;t=60s

👍 1
b
@Nate is it worth do put async on all tasks on prefect 3.0?
n
you certainly don’t have to. the submit / map interface works regardless. I would only make tasks async if you plan to use them like normal async functions, or if they contain code that needs to be async