Santhosh Solomon (Fluffy)
01/06/2023, 2:31 AMfrom prefect import task, flow
from time import sleep
from prefect.task_runners import ConcurrentTaskRunner
@task(tags=['concurrent-task'])
def perf_task(input):
sleep(25)
print(input)
@flow(task_runner=ConcurrentTaskRunner)
def test_flow():
for i in range(100):
perf_task(i)
if __name__ == '__main__':
test_flow()
I have created a deployment with queue concurrent_test
and set the concurrency limit to 3. In my understanding this is responsible for the number of flow runs which can be executed concurrently.
after that I have created a concurrency limit for the tag i have included in the task.
prefect concurrency-limit create concurrent-task 5
My understanding of this setting is for number of tasks in which could be executed concurrently.
Once both the settings are done, I have started an agent with
prefect agent start -q concurrent_test
When I initiate the flow run, below is the execution log, in which it is very clear there is no concurrency (check the time stamp for every task execution)
Starting v2.7.6 agent with ephemeral API...
___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue(s): concurrent_test...
07:54:51.280 | INFO | prefect.agent - Submitting flow run '9034dd75-b32e-4b39-8f48-eb03c55e3fe5'
07:54:51.336 | INFO | prefect.infrastructure.process - Opening process 'strange-starling'...
07:54:51.348 | INFO | prefect.agent - Completed submission of flow run '9034dd75-b32e-4b39-8f48-eb03c55e3fe5'
/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
07:54:55.882 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-0' for task 'perf_task'
07:54:55.883 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-0' immediately...
07:54:55.909 | INFO | Task run 'perf_task-c7bf4036-0' - Finished in state Completed()
07:54:55.921 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-1' for task 'perf_task'
07:54:55.921 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-1' immediately...
07:54:55.945 | INFO | Task run 'perf_task-c7bf4036-1' - Finished in state Completed()
07:54:55.955 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-2' for task 'perf_task'
07:54:55.955 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-2' immediately...
07:54:55.979 | INFO | Task run 'perf_task-c7bf4036-2' - Finished in state Completed()
07:54:55.989 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-3' for task 'perf_task'
07:54:55.990 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-3' immediately...
07:54:56.012 | INFO | Task run 'perf_task-c7bf4036-3' - Finished in state Completed()
07:54:56.022 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-4' for task 'perf_task'
07:54:56.022 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-4' immediately...
07:54:56.045 | INFO | Task run 'perf_task-c7bf4036-4' - Finished in state Completed()
07:54:56.054 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-5' for task 'perf_task'
07:54:56.054 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-5' immediately...
07:54:56.077 | INFO | Task run 'perf_task-c7bf4036-5' - Finished in state Completed()
07:54:56.086 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-6' for task 'perf_task'
07:54:56.087 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-6' immediately...
07:54:56.109 | INFO | Task run 'perf_task-c7bf4036-6' - Finished in state Completed()
07:54:56.120 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-7' for task 'perf_task'
07:54:56.121 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-7' immediately...
07:54:56.144 | INFO | Task run 'perf_task-c7bf4036-7' - Finished in state Completed()
07:54:56.153 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-8' for task 'perf_task'
07:54:56.153 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-8' immediately...
07:54:56.177 | INFO | Task run 'perf_task-c7bf4036-8' - Finished in state Completed()
07:54:56.187 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-9' for task 'perf_task'
07:54:56.188 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-9' immediately...
07:54:56.210 | INFO | Task run 'perf_task-c7bf4036-9' - Finished in state Completed()
07:54:56.221 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-10' for task 'perf_task'
07:54:56.222 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-10' immediately...
07:54:56.244 | INFO | Task run 'perf_task-c7bf4036-10' - Finished in state Completed()
07:54:56.253 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-11' for task 'perf_task'
07:54:56.253 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-11' immediately...
07:54:56.278 | INFO | Task run 'perf_task-c7bf4036-11' - Finished in state Completed()
07:54:56.289 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-12' for task 'perf_task'
07:54:56.289 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-12' immediately...
07:54:56.311 | INFO | Task run 'perf_task-c7bf4036-12' - Finished in state Completed()
07:54:56.323 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-13' for task 'perf_task'
07:54:56.323 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-13' immediately...
07:54:56.345 | INFO | Task run 'perf_task-c7bf4036-13' - Finished in state Completed()
07:54:56.355 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-14' for task 'perf_task'
07:54:56.355 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-14' immediately...
07:54:56.380 | INFO | Task run 'perf_task-c7bf4036-14' - Finished in state Completed()
07:54:56.389 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-15' for task 'perf_task'
07:54:56.389 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-15' immediately...
07:54:56.411 | INFO | Task run 'perf_task-c7bf4036-15' - Finished in state Completed()
07:54:56.424 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-16' for task 'perf_task'
07:54:56.424 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-16' immediately...
07:54:56.447 | INFO | Task run 'perf_task-c7bf4036-16' - Finished in state Completed()
I am using prefect 2.7.6.
Thanks in advanceTim Galvin
01/06/2023, 3:25 AMperf_task.submit(i)
Santhosh Solomon (Fluffy)
01/06/2023, 3:36 AM