Can anyone help me with understanding the concurre...
# ask-community
s
Can anyone help me with understanding the concurrency concept in prefect? I have created a sample flow to understand the concurrency at both queue level and task(tag) level. I used the below script
Copy code
from prefect import task, flow
from time import sleep
from prefect.task_runners import ConcurrentTaskRunner

@task(tags=['concurrent-task'])
def perf_task(input):
    sleep(25)
    print(input)

@flow(task_runner=ConcurrentTaskRunner)
def test_flow():
    for i in range(100):
        perf_task(i)


if __name__ == '__main__':
    test_flow()
I have created a deployment with queue
concurrent_test
and set the concurrency limit to 3. In my understanding this is responsible for the number of flow runs which can be executed concurrently. after that I have created a concurrency limit for the tag i have included in the task.
Copy code
prefect concurrency-limit create concurrent-task 5
My understanding of this setting is for number of tasks in which could be executed concurrently. Once both the settings are done, I have started an agent with
Copy code
prefect agent start -q concurrent_test
When I initiate the flow run, below is the execution log, in which it is very clear there is no concurrency (check the time stamp for every task execution)
Copy code
Starting v2.7.6 agent with ephemeral API...

  ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
 | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
 |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
 |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|


Agent started! Looking for work from queue(s): concurrent_test...
07:54:51.280 | INFO    | prefect.agent - Submitting flow run '9034dd75-b32e-4b39-8f48-eb03c55e3fe5'
07:54:51.336 | INFO    | prefect.infrastructure.process - Opening process 'strange-starling'...
07:54:51.348 | INFO    | prefect.agent - Completed submission of flow run '9034dd75-b32e-4b39-8f48-eb03c55e3fe5'
/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
  warn(RuntimeWarning(msg))
07:54:55.882 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-0' for task 'perf_task'
07:54:55.883 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-0' immediately...
07:54:55.909 | INFO    | Task run 'perf_task-c7bf4036-0' - Finished in state Completed()
07:54:55.921 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-1' for task 'perf_task'
07:54:55.921 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-1' immediately...
07:54:55.945 | INFO    | Task run 'perf_task-c7bf4036-1' - Finished in state Completed()
07:54:55.955 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-2' for task 'perf_task'
07:54:55.955 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-2' immediately...
07:54:55.979 | INFO    | Task run 'perf_task-c7bf4036-2' - Finished in state Completed()
07:54:55.989 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-3' for task 'perf_task'
07:54:55.990 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-3' immediately...
07:54:56.012 | INFO    | Task run 'perf_task-c7bf4036-3' - Finished in state Completed()
07:54:56.022 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-4' for task 'perf_task'
07:54:56.022 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-4' immediately...
07:54:56.045 | INFO    | Task run 'perf_task-c7bf4036-4' - Finished in state Completed()
07:54:56.054 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-5' for task 'perf_task'
07:54:56.054 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-5' immediately...
07:54:56.077 | INFO    | Task run 'perf_task-c7bf4036-5' - Finished in state Completed()
07:54:56.086 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-6' for task 'perf_task'
07:54:56.087 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-6' immediately...
07:54:56.109 | INFO    | Task run 'perf_task-c7bf4036-6' - Finished in state Completed()
07:54:56.120 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-7' for task 'perf_task'
07:54:56.121 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-7' immediately...
07:54:56.144 | INFO    | Task run 'perf_task-c7bf4036-7' - Finished in state Completed()
07:54:56.153 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-8' for task 'perf_task'
07:54:56.153 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-8' immediately...
07:54:56.177 | INFO    | Task run 'perf_task-c7bf4036-8' - Finished in state Completed()
07:54:56.187 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-9' for task 'perf_task'
07:54:56.188 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-9' immediately...
07:54:56.210 | INFO    | Task run 'perf_task-c7bf4036-9' - Finished in state Completed()
07:54:56.221 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-10' for task 'perf_task'
07:54:56.222 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-10' immediately...
07:54:56.244 | INFO    | Task run 'perf_task-c7bf4036-10' - Finished in state Completed()
07:54:56.253 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-11' for task 'perf_task'
07:54:56.253 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-11' immediately...
07:54:56.278 | INFO    | Task run 'perf_task-c7bf4036-11' - Finished in state Completed()
07:54:56.289 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-12' for task 'perf_task'
07:54:56.289 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-12' immediately...
07:54:56.311 | INFO    | Task run 'perf_task-c7bf4036-12' - Finished in state Completed()
07:54:56.323 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-13' for task 'perf_task'
07:54:56.323 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-13' immediately...
07:54:56.345 | INFO    | Task run 'perf_task-c7bf4036-13' - Finished in state Completed()
07:54:56.355 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-14' for task 'perf_task'
07:54:56.355 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-14' immediately...
07:54:56.380 | INFO    | Task run 'perf_task-c7bf4036-14' - Finished in state Completed()
07:54:56.389 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-15' for task 'perf_task'
07:54:56.389 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-15' immediately...
07:54:56.411 | INFO    | Task run 'perf_task-c7bf4036-15' - Finished in state Completed()
07:54:56.424 | INFO    | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-16' for task 'perf_task'
07:54:56.424 | INFO    | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-16' immediately...
07:54:56.447 | INFO    | Task run 'perf_task-c7bf4036-16' - Finished in state Completed()
I am using prefect 2.7.6. Thanks in advance
1
t
try to use
perf_task.submit(i)
2
s
Thanks. It working.
👍 2