Alex Shtuchkin
09/18/2023, 7:15 PMmy_task.map(..., concurrency=10)
, which would make sure to start new tasks only if less than 10 are running in the current flow run. Not sure how possible that is.import numpy as np
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task(tags=["test-concurrency"]) # either use tag-based concurrency limit
def test_task(i: int) -> float:
with concurrency("test-concurrency"): # or context-manager based
arr = np.random.random((1000, 1000))
return np.matmul(arr, arr).sum() # Example of task work, takes ~1 second on my machine.
@flow
def test_flow():
return test_task.map(range(1000))
if __name__ == "__main__":
test_flow()