Imran Nooraddin
09/02/2024, 11:42 AMMarvin
09/02/2024, 11:42 AMImran Nooraddin
09/02/2024, 11:42 AMImran Nooraddin
09/02/2024, 11:44 AMMarvin
09/02/2024, 11:44 AMslot_decay_per_second
.
- High slot_decay_per_second
values mean slots refresh quickly (e.g., 5.0 allows a task roughly every 0.2 seconds). Low values mean slots refresh slowly (e.g., 0.1 means a 10-second wait).
Source
2. Rate Limits via Tags:
- Task concurrency limits can be configured through tags and set via the Prefect CLI, API, Python client, or UI.
- You can set limits on specific tags using commands like prefect concurrency-limit create <tag> <limit>
.
- For programmatic updates, use PrefectClient.orchestration.create_concurrency_limit
.
Source
3. Rate Limit Example in Code:
- To throttle task submission, you can use the rate_limit
function in Prefect.
python
from prefect import flow, task
from prefect.concurrency.sync import rate_limit
@task
def my_task(i):
return i
@flow
def my_flow():
for _ in range(100):
rate_limit("slow-my-flow", occupy=1)
my_task.submit(1)
if __name__ == "__main__":
my_flow()
Source
To configure your tasks such that one runs once per second and another twice per second, you would set up these concurrency limits with appropriate slot_decay_per_second
values and tag them accordingly.
So go ahead, manage those precious task slots and keep your system humming smoothly. Or as smoothly as Prefect lets you, anyway.Imran Nooraddin
09/02/2024, 11:50 AM@task #EDIT THIS TO INCLUDE RATE LIMITS
def add(x: int, y: int) -> int:
return x + y
@task # EDIT THIS TO INCLUDE RATE LIMITS
def multiply(x: int, y: int) -> int:
return x * y
@flow
def add_and_multiply(x:int, y:int):
sum = add(x, y)
product = multiply(x, y)
return sum, product
Marvin
09/02/2024, 11:50 AMpython
from prefect import flow, task
from prefect.concurrency.sync import rate_limit
from prefect.client import get_client
@task(tags=["add_task"])
def add(x: int, y: int) -> int:
return x + y
@task(tags=["multiply_task"])
def multiply(x: int, y: int) -> int:
return x * y
@flow
def add_and_multiply(x: int, y: int):
sum_ = add(x, y)
product = multiply(x, y)
return sum_, product
if __name__ == "__main__":
# Set rate limit for the tags
client = get_client()
client.create_concurrency_limit("add_task", 1) # 1 per second
client.create_concurrency_limit("multiply_task", 2) # 2 per second
add_and_multiply(x=5, y=4)
You’ll set up those limits only once, and they’ll apply whenever the tasks with corresponding tags are executed. It's not quite the decorator addition you sought, but it reduces redundancy.
To read more about Prefect's approach:
- Global Concurrency Limits
- Task Run Limits
In the grand universe of infeasible expectations, I hope this at least somewhat helps.