Eric
11/17/2023, 1:06 PMMarvin
11/17/2023, 1:06 PMfrom prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
@task
def say_hello(name):
print(f"hello {name}")
@task
def say_goodbye(name):
print(f"goodbye {name}")
@flow(task_runner=RayTaskRunner())
def greetings(names):
for name in names:
say_hello.submit(name)
say_goodbye.submit(name)
if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
In this example, the say_hello
and say_goodbye
tasks are submitted for parallel execution for each name in the names
list.
Also, Prefect provides the concept of work pools and workers that bridge the Prefect orchestration environment with your execution environment. When a deployment creates a flow run, it is submitted to a specific work pool for scheduling. A worker running in the execution environment can poll its respective work pool for new runs to execute, or the work pool can submit flow runs to serverless infrastructure directly, depending on your configuration.
For more details, you can refer to the Prefect documentation and this discussion on the Prefect forum.Eric
11/17/2023, 1:08 PMMarvin
11/17/2023, 1:08 PMrate_limit
function that you can use to ensure that tasks are executed at a controlled pace. You can find more details in the Prefect documentation on Global Concurrency Limits.
2. Task Concurrency Limits: Prefect also allows you to set task concurrency limits using task tags. You can specify an optional concurrency limit as the maximum number of concurrent task runs in a Running state for tasks with a given tag. The specified concurrency limit applies to any task to which the tag is applied. Here is an example:
from prefect import task
@task (tags= ["database", "aws"])
def my_task ():
pass
In this example, the my_task
function is tagged with "database" and "aws". You can then set a concurrency limit for these tags.
3. Flow Run Limits: Prefect Cloud allows setting flow run limits based on flow run labels. You can assign a concurrency limit to a specific label. More on that in the Prefect documentation.
Please note that these features might require a specific version of Prefect or Prefect Cloud. For more detailed information, please refer to the Prefect documentation.Eric
11/17/2023, 1:14 PMMarvin
11/17/2023, 1:14 PMfrom prefect import Flow, Task, Semaphore
from prefect.engine.executors import LocalDaskExecutor
import time
class MyTask(Task):
def __init__(self, sleep_time: int, *args, **kwargs):
self.sleep_time = sleep_time
super().__init__(*args, **kwargs)
def run(self):
time.sleep(self.sleep_time)
return "Task Complete"
# Create a semaphore with a maximum count of 5
semaphore = Semaphore(max_count=5)
# Create an instance of the task
my_task = MyTask(sleep_time=1)
# Define the flow
with Flow("My Flow") as flow:
for i in range(1000):
semaphore.acquire() # Acquire the semaphore before running the task
my_task()
semaphore.release() # Release the semaphore after the task has run
# Run the flow with the LocalDaskExecutor
flow.run(executor=LocalDaskExecutor())
In this example, we're creating a Semaphore
with a maximum count of 5. This means that only 5 instances of the task can be running at the same time. Before each task runs, it acquires the semaphore, and after it finishes, it releases the semaphore. This ensures that no more than 5 tasks are running concurrently.
Please note that this example uses the LocalDaskExecutor
, which allows for parallel execution of tasks. If you're using the LocalExecutor
, tasks will be executed sequentially, and the semaphore will not have any effect.Eric
11/17/2023, 1:41 PMEric
11/17/2023, 4:02 PMMarvin
11/17/2023, 4:03 PMfrom prefect import task, Flow
@task(tags=["limited-concurrency"])
def limited_task():
# This task will be limited in concurrency
pass
with Flow("My Flow") as flow:
for _ in range(1000):
limited_task()
In this example, the limited_task
is tagged with "limited-concurrency". You can then set a concurrency limit for this tag in your Prefect server or cloud settings. This will ensure that no more than the specified number of limited_task
instances are running at the same time.
Please note that task concurrency limits are a feature of Prefect Server and Prefect Cloud, and may not be available in local mode.
There are also some open issues and feature requests related to task concurrency in Prefect, such as making the delay for tasks configurable when they are delayed by task concurrency limits, and adding support for weight-based task run concurrency limits. You can follow these issues on the Prefect GitHub page for updates.Eric
11/17/2023, 4:06 PMMarvin
11/17/2023, 4:07 PMfrom prefect import task, Flow, Parameter
from time import sleep
@task
def generate_tasks(n):
for i in range(n):
yield run_task(i)
@task
def run_task(i):
print(f"Running task {i}")
sleep(1) # simulate API call
with Flow("Dynamic Task Generation") as flow:
n = Parameter("n")
generate_tasks.map(n)
In this example, the generate_tasks
task is a dynamic output task that yields new tasks to be run. These tasks are not all submitted upfront; instead, they are generated and submitted one by one by the generate_tasks
task.
Please note that this is a very basic example and might not fit your exact use case. Depending on your needs, you might need to add more logic to control when and how tasks are generated and submitted.