<@ULVA73B9P> can I run multiple tasks in parallel ...
# ask-marvin
e
@Marvin can I run multiple tasks in parallel using a thread pool?
m
Yes, you can run multiple tasks in parallel using a thread pool in Prefect. By default, the LocalDaskExecutor uses threads. You can also use the RayTaskRunner which automatically creates a parallel execution environment. Here is an example using the RayTaskRunner:
Copy code
from prefect import flow, task 
from prefect_ray.task_runners import RayTaskRunner 

@task 
def say_hello(name): 
    print(f"hello {name}") 

@task 
def say_goodbye(name): 
    print(f"goodbye {name}") 

@flow(task_runner=RayTaskRunner()) 
def greetings(names): 
    for name in names: 
        say_hello.submit(name) 
        say_goodbye.submit(name) 

if __name__ == "__main__": 
    greetings(["arthur", "trillian", "ford", "marvin"])
In this example, the
say_hello
and
say_goodbye
tasks are submitted for parallel execution for each name in the
names
list. Also, Prefect provides the concept of work pools and workers that bridge the Prefect orchestration environment with your execution environment. When a deployment creates a flow run, it is submitted to a specific work pool for scheduling. A worker running in the execution environment can poll its respective work pool for new runs to execute, or the work pool can submit flow runs to serverless infrastructure directly, depending on your configuration. For more details, you can refer to the Prefect documentation and this discussion on the Prefect forum.
e
@Marvin what if I wanted to manage the threadpool, for example wanted to use a maximum of 5 parallel threads to limit concurrency?
m
In Prefect, you can manage concurrency in a couple of ways: 1. Global Concurrency Limits: Prefect provides a
rate_limit
function that you can use to ensure that tasks are executed at a controlled pace. You can find more details in the Prefect documentation on Global Concurrency Limits. 2. Task Concurrency Limits: Prefect also allows you to set task concurrency limits using task tags. You can specify an optional concurrency limit as the maximum number of concurrent task runs in a Running state for tasks with a given tag. The specified concurrency limit applies to any task to which the tag is applied. Here is an example:
Copy code
from prefect import task 

@task (tags= ["database", "aws"]) 
def my_task (): 
    pass
In this example, the
my_task
function is tagged with "database" and "aws". You can then set a concurrency limit for these tags. 3. Flow Run Limits: Prefect Cloud allows setting flow run limits based on flow run labels. You can assign a concurrency limit to a specific label. More on that in the Prefect documentation. Please note that these features might require a specific version of Prefect or Prefect Cloud. For more detailed information, please refer to the Prefect documentation.
e
@Marvin give an example of a flow that has to submit several thousand instances of the same task; however, only 5 instances of that task should ever be executing in that flow at maximum
m
Sure, here's an example of how you can achieve this using Prefect's `Semaphore`:
Copy code
from prefect import Flow, Task, Semaphore
from prefect.engine.executors import LocalDaskExecutor
import time

class MyTask(Task):
    def __init__(self, sleep_time: int, *args, **kwargs):
        self.sleep_time = sleep_time
        super().__init__(*args, **kwargs)

    def run(self):
        time.sleep(self.sleep_time)
        return "Task Complete"

# Create a semaphore with a maximum count of 5
semaphore = Semaphore(max_count=5)

# Create an instance of the task
my_task = MyTask(sleep_time=1)

# Define the flow
with Flow("My Flow") as flow:
    for i in range(1000):
        semaphore.acquire()  # Acquire the semaphore before running the task
        my_task()
        semaphore.release()  # Release the semaphore after the task has run

# Run the flow with the LocalDaskExecutor
flow.run(executor=LocalDaskExecutor())
In this example, we're creating a
Semaphore
with a maximum count of 5. This means that only 5 instances of the task can be running at the same time. Before each task runs, it acquires the semaphore, and after it finishes, it releases the semaphore. This ensures that no more than 5 tasks are running concurrently. Please note that this example uses the
LocalDaskExecutor
, which allows for parallel execution of tasks. If you're using the
LocalExecutor
, tasks will be executed sequentially, and the semaphore will not have any effect.
e
@Marvin Is semamphore deprecated in prefect 2? I don't see that available
@Marvin I have to submit a thousand instances of one task, but I dont want to submit them all at once since that will be a huge strain on the api. Is there a way to limit the number of task instances running at a given time?
m
In Prefect 2.x, you can limit the number of task instances running at a given time using task concurrency limits. You can set a concurrency limit on a task tag, and this limit will apply to any task with that tag. Here's an example:
Copy code
from prefect import task, Flow

@task(tags=["limited-concurrency"])
def limited_task():
    # This task will be limited in concurrency
    pass

with Flow("My Flow") as flow:
    for _ in range(1000):
        limited_task()
In this example, the
limited_task
is tagged with "limited-concurrency". You can then set a concurrency limit for this tag in your Prefect server or cloud settings. This will ensure that no more than the specified number of
limited_task
instances are running at the same time. Please note that task concurrency limits are a feature of Prefect Server and Prefect Cloud, and may not be available in local mode. There are also some open issues and feature requests related to task concurrency in Prefect, such as making the delay for tasks configurable when they are delayed by task concurrency limits, and adding support for weight-based task run concurrency limits. You can follow these issues on the Prefect GitHub page for updates.
e
@Marvin but that still requires submitting all the tasks up front right
m
Yes, you're correct. In the example I provided, all tasks are submitted upfront, and the concurrency limit only controls how many of them are running at the same time. If you want to control the submission of tasks as well, you might consider using a dynamic task generation pattern. In this pattern, you would have one task that generates and submits new tasks based on the state of your system or API. Here's a simple example:
Copy code
from prefect import task, Flow, Parameter
from time import sleep

@task
def generate_tasks(n):
    for i in range(n):
        yield run_task(i)

@task
def run_task(i):
    print(f"Running task {i}")
    sleep(1)  # simulate API call

with Flow("Dynamic Task Generation") as flow:
    n = Parameter("n")
    generate_tasks.map(n)
In this example, the
generate_tasks
task is a dynamic output task that yields new tasks to be run. These tasks are not all submitted upfront; instead, they are generated and submitted one by one by the
generate_tasks
task. Please note that this is a very basic example and might not fit your exact use case. Depending on your needs, you might need to add more logic to control when and how tasks are generated and submitted.