Justin Trautmann
10/17/2022, 2:35 PMprefect concurrency-limit inspect
command but is in fact in status completed. this effectively lowers the concurrency-limit for all subsequent tasks as these pseudo-active tasks keep occupying a slot for the tag. are we at all supposed to adapt the concurrency limit while tagged tasks are being executed or is this considered bad practice?
Please find in the thread a self-contained example flow that can be used to reproduce the behavior. Any advice i much appreciated. Thank you.Justin Trautmann
10/17/2022, 2:35 PMimport time
from prefect import flow, task, tags, context, get_run_logger
from prefect.utilities.asyncutils import sync_compatible
tag = "test-limit"
@sync_compatible
async def create_limit(n):
# set concurrency limit
await context.get_run_context().client.create_concurrency_limit(tag=tag, concurrency_limit=n)
@sync_compatible
async def get_active_tasks():
# list all "active" tasks for the tag
return (await context.get_run_context().client.read_concurrency_limit_by_tag(tag=tag)).active_slots
@sync_compatible
async def get_running_tasks():
# list all tasks ids in state "Running"
running_tasks = []
for future in context.get_run_context().task_run_futures:
if (await future._get_state()).name == "Running":
running_tasks.append(future.task_run.id)
return running_tasks
def monitor_tasks(t):
# print every two seconds a list of "active" and "Running" tasks for t seconds
logger = get_run_logger()
start = time.time()
while time.time() - start < t:
running_tasks = get_running_tasks()
active_tasks = get_active_tasks()
if running_tasks or active_tasks:
<http://logger.info|logger.info>(f"Running tasks ({len(running_tasks)}): {running_tasks}")
<http://logger.info|logger.info>(f"Active tasks ({len(active_tasks)}): {active_tasks}")
time.sleep(2)
else:
break
@task
def sleep_task(t):
# sleep for t seconds
time.sleep(t)
@flow
def test_limits():
logger = get_run_logger()
# setting concurrency limit 5 for tag
create_limit(5)
# submit 20 parallel tagged tasks that sleep for 20s each
with tags(tag):
for _ in range(0, 20):
sleep_task.submit(20)
# time.sleep(0.2) # This leads to expected behaviour
# monitor "Running" and "active" tasks for 30s
# see issue 1 in output: more tasks "Running" than "active"
monitor_tasks(30)
# increase concurrency limit while tasks are still running
<http://logger.info|logger.info>("Updating limits")
create_limit(10)
# continue monitoring
# see issue 2 in output: No tasks in "Running" but still tasks in "active"
monitor_tasks(120)
# After the flow exits, check limits via `prefect concurrency-limit inspect test-limit`
# see that there are still "Active Task Runs" even though the flow "Finished in state Completed('All states completed.')"
if __name__ == "__main__":
test_limits()
Justin Trautmann
10/17/2022, 3:00 PMJeff Hale
10/17/2022, 11:25 PMJustin Trautmann
10/18/2022, 7:18 AMJeff Hale
10/18/2022, 6:42 PMJustin Trautmann
10/19/2022, 8:22 AMJustin Trautmann
10/21/2022, 3:31 PMJustin Trautmann
10/21/2022, 3:32 PMimport time
from prefect import flow, task, context, get_run_logger
from prefect.utilities.asyncutils import sync_compatible
@sync_compatible
async def get_running_tasks():
# list all tasks ids in state "Running"
running_tasks = []
for future in context.get_run_context().task_run_futures:
if (await future._get_state()).name == "Running":
running_tasks.append(future.task_run.id)
return running_tasks
def monitor_tasks(t):
logger = get_run_logger()
start = time.time()
while time.time() - start < t:
running_tasks = get_running_tasks()
if running_tasks:
<http://logger.info|logger.info>(f"Running tasks (in Prefect Cloud):\t{len(running_tasks)}")
<http://logger.info|logger.info>(f"Active tasks (actually doing work):\t{len([x for x in active_tasks if x])}")
else:
break
time.sleep(2)
@task
def sleep_task(task_id, t):
# global variable is set to true as soon as this starts to run and reset at finish
active_tasks[task_id] = True
time.sleep(t)
active_tasks[task_id] = False
num_tasks = 50
active_tasks = num_tasks * [False]
@flow
def test_concurrency():
for task_id in range(0, num_tasks):
sleep_task.submit(task_id, 30)
monitor_tasks(120)
# Observe output: Less active tasks than running tasks
if __name__ == "__main__":
test_concurrency()
Output:
17:18:30.693 | INFO | prefect.engine - Created flow run 'ingenious-honeybee' for flow 'test-concurrency'
17:18:32.019 | INFO | Flow run 'ingenious-honeybee' - Created task run 'sleep_task-f305a62b-0' for task 'sleep_task'
...
17:18:54.814 | INFO | Flow run 'ingenious-honeybee' - Running tasks (in Prefect Cloud): 50
17:18:54.815 | INFO | Flow run 'ingenious-honeybee' - Active tasks (actually doing work): 39
...
17:19:03.219 | INFO | Task run 'sleep_task-f305a62b-3' - Finished in state Completed()
...
Jeff Hale
10/21/2022, 6:30 PMIs the concurrent task runner limited in concurrency independent from tags?I don’t believe so. At least, I don’t think it should be. 🙂 @Zanie might have an insight here - I believe this is an area he’s looking at.
Zanie
Justin Trautmann
10/22/2022, 4:22 PMJustin Trautmann
10/25/2022, 9:35 AMimport time
from anyio import sleep, create_task_group, run
num_tasks = 50
active_tasks = num_tasks * [False]
async def sleep_task_async(task_id, t):
# global variable is set to true as soon as this starts to run and reset at finish
active_tasks[task_id] = True
await sleep(t)
active_tasks[task_id] = False
async def monitor_tasks_async(t):
start = time.time()
while time.time() - start < t:
print(f"Active tasks (actually doing work):\t{len([x for x in active_tasks if x])}")
await sleep(2)
async def test_task_group_concurrency():
async with create_task_group() as tg:
for num in range(num_tasks):
tg.start_soon(sleep_task_async, num, 5)
tg.start_soon(monitor_tasks_async, 10)
if __name__ == "__main__":
run(test_task_group_concurrency)
Zanie
Zanie
Justin Trautmann
10/25/2022, 2:56 PMZanie
Zanie
Zanie
Justin Trautmann
10/25/2022, 3:20 PMJustin Trautmann
10/27/2022, 1:56 PMfrom prefect import flow, task
from anyio import run, to_thread
@task
async def dummy_task(input_task=None):
return
@flow
async def test_one_long_task_chain():
# to_thread.current_default_thread_limiter().total_tokens = 200 # fixes deadlock
depth = 100
future = await dummy_task.submit()
for _ in range(depth):
future = await dummy_task.submit(future)
@flow
async def test_multiple_short_task_chains():
# to_thread.current_default_thread_limiter().total_tokens = 200 # fixes deadlock
breadth = 100
level_1 = [await dummy_task.submit() for _ in range(breadth)]
level_2 = [await dummy_task.submit(level_1[i]) for i in range(breadth)]
if __name__ == "__main__":
# both of the following calls run into a deadlock
run(test_one_long_task_chain)
# run(test_multiple_short_task_chains)
Zanie
async
is going to more efficient and involve less threads. I’m aware of this issue with collecting inputs in threads causing deadlocks. My next month or so of work is going to be dedicated to changing the lower level workings of the engine to fix these problems.Zanie
Justin Trautmann
10/27/2022, 2:54 PMJustin Trautmann
10/27/2022, 3:01 PMZanie
Justin Trautmann
10/27/2022, 3:29 PMZanie
Justin Trautmann
10/27/2022, 4:16 PMZanie