https://prefect.io logo
Title
j

Justin Trautmann

10/17/2022, 2:35 PM
Hello community, hello Prefect team, one of our prefect 2 flows contains a large number of parallel tasks that access a database with limited capacity. in order to avoid too much load on the db, i tried to apply tag based task concurrency limits, observed some unexpected behavior and would love to get some feedback on if i am using concurrency limits correctly. 1. when submitting a batch of tasks in a loop, the specified concurrency limits do not seem to apply immediately. there are more running tasks than active slots for the tag. only if i introduce an artificial delay between submit calls, the limit takes effect. i assume this is due to some race conditions when accessing the orion db and was wondering if there is a preferred way of submitting tasks so that limits are respected. 2. when increasing the task concurrency limit while a flow is executing (this would be very useful when having long running flows and a dynamically scaleable db), the number of "Active Tasks Runs" doesn't return to zero after all flows are completed. instead a certain number of tasks is still listed as active by the
prefect concurrency-limit inspect
command but is in fact in status completed. this effectively lowers the concurrency-limit for all subsequent tasks as these pseudo-active tasks keep occupying a slot for the tag. are we at all supposed to adapt the concurrency limit while tagged tasks are being executed or is this considered bad practice? Please find in the thread a self-contained example flow that can be used to reproduce the behavior. Any advice i much appreciated. Thank you.
👀 1
🚀 2
1
import time
from prefect import flow, task, tags, context, get_run_logger
from prefect.utilities.asyncutils import sync_compatible


tag = "test-limit"


@sync_compatible
async def create_limit(n):
    # set concurrency limit
    await context.get_run_context().client.create_concurrency_limit(tag=tag, concurrency_limit=n)


@sync_compatible
async def get_active_tasks():
    # list all "active" tasks for the tag
    return (await context.get_run_context().client.read_concurrency_limit_by_tag(tag=tag)).active_slots


@sync_compatible
async def get_running_tasks():
    # list all tasks ids in state "Running"
    running_tasks = []
    for future in context.get_run_context().task_run_futures:
        if (await future._get_state()).name == "Running":
            running_tasks.append(future.task_run.id)
    return running_tasks


def monitor_tasks(t):
    # print every two seconds a list of "active" and "Running" tasks for t seconds
    logger = get_run_logger()
    start = time.time()
    while time.time() - start < t:
        running_tasks = get_running_tasks()
        active_tasks = get_active_tasks()
        if running_tasks or active_tasks:
            <http://logger.info|logger.info>(f"Running tasks ({len(running_tasks)}): {running_tasks}")
            <http://logger.info|logger.info>(f"Active tasks ({len(active_tasks)}): {active_tasks}")
            time.sleep(2)
        else:
            break


@task
def sleep_task(t):
    # sleep for t seconds
    time.sleep(t)


@flow
def test_limits():
    logger = get_run_logger()

    # setting concurrency limit 5 for tag
    create_limit(5)

    # submit 20 parallel tagged tasks that sleep for 20s each
    with tags(tag):
        for _ in range(0, 20):
            sleep_task.submit(20)
            # time.sleep(0.2) # This leads to expected behaviour

    # monitor "Running" and "active" tasks for 30s
    # see issue 1 in output: more tasks "Running" than "active"
    monitor_tasks(30)

    # increase concurrency limit while tasks are still running
    <http://logger.info|logger.info>("Updating limits")
    create_limit(10)

    # continue monitoring
    # see issue 2 in output: No tasks in "Running" but still tasks in "active"
    monitor_tasks(120)

    # After the flow exits, check limits via `prefect concurrency-limit inspect test-limit`
    # see that there are still "Active Task Runs" even though the flow "Finished in state Completed('All states completed.')"


if __name__ == "__main__":
    test_limits()
j

Jeff Hale

10/17/2022, 11:25 PM
Thank you for the bug report - looks like folks think it should be fixed in the expected next release on Thurs.
j

Justin Trautmann

10/18/2022, 7:18 AM
Thank you jeff for looking into this. the bug report is mainly related to point 2 from my initial message. will also point 1 be fixed with the next release?
j

Jeff Hale

10/18/2022, 6:42 PM
Looks like 2.6.2, which was just released, fixed the bug with point 1 - more tasks “Running” than “active”. Point 2 - No tasks in “Running” but still tasks in “active”, remains for me: Full traceback added to the issue linked above.
🙌 1
👀 1
j

Justin Trautmann

10/19/2022, 8:22 AM
Hello Jeff, on 2.6.3. i still see occasionally more running tasks than active. but the number of running tasks is consistently less or equal to the concurrency limit, which solves the initial problem 👍🏻
👍 1
🚀 1
hello Jeff, sorry to bother you again with a very similar issue. When submitting larger numbers of parallel tasks using the concurrent task runner, we observed that there's a large number of tasks "Running" in the Prefect Cloud UI as expected, however they don't actually seem to do any work as they don't produce log output or any load on systems that they should be interacting with. The agent has enough idle cpu capacity to cater for more concurrent tasks. Is the concurrent task runner limited in concurrency independent from tags? I have created a small script to reproduce the issue. Any advice on how to use the concurrent tasks runner correctly to avoid this issue would be awesome.
import time

from prefect import flow, task, context, get_run_logger
from prefect.utilities.asyncutils import sync_compatible


@sync_compatible
async def get_running_tasks():
    # list all tasks ids in state "Running"
    running_tasks = []
    for future in context.get_run_context().task_run_futures:
        if (await future._get_state()).name == "Running":
            running_tasks.append(future.task_run.id)
    return running_tasks


def monitor_tasks(t):
    logger = get_run_logger()
    start = time.time()
    while time.time() - start < t:
        running_tasks = get_running_tasks()

        if running_tasks:
            <http://logger.info|logger.info>(f"Running tasks (in Prefect Cloud):\t{len(running_tasks)}")
            <http://logger.info|logger.info>(f"Active tasks (actually doing work):\t{len([x for x in active_tasks if x])}")
        else:
            break
        time.sleep(2)


@task
def sleep_task(task_id, t):
    # global variable is set to true as soon as this starts to run and reset at finish
    active_tasks[task_id] = True
    time.sleep(t)
    active_tasks[task_id] = False


num_tasks = 50
active_tasks = num_tasks * [False]


@flow
def test_concurrency():
    for task_id in range(0, num_tasks):
        sleep_task.submit(task_id, 30)

    monitor_tasks(120)
    # Observe output: Less active tasks than running tasks


if __name__ == "__main__":
    test_concurrency()
Output:
17:18:30.693 | INFO    | prefect.engine - Created flow run 'ingenious-honeybee' for flow 'test-concurrency'
17:18:32.019 | INFO    | Flow run 'ingenious-honeybee' - Created task run 'sleep_task-f305a62b-0' for task 'sleep_task'
...
17:18:54.814 | INFO    | Flow run 'ingenious-honeybee' - Running tasks (in Prefect Cloud):      50
17:18:54.815 | INFO    | Flow run 'ingenious-honeybee' - Active tasks (actually doing work):    39
...
17:19:03.219 | INFO    | Task run 'sleep_task-f305a62b-3' - Finished in state Completed()
...
j

Jeff Hale

10/21/2022, 6:30 PM
Hi Justin.
Is the concurrent task runner limited in concurrency independent from tags?
I don’t believe so. At least, I don’t think it should be. 🙂 @Zanie might have an insight here - I believe this is an area he’s looking at.
z

Zanie

10/22/2022, 4:10 PM
The concurrent task runner uses the async event loop / threads so it's limited to a single process / CPU core. We have plans to add a multiprocessing based task runner as well later.
j

Justin Trautmann

10/22/2022, 4:22 PM
Thanks Michael for your input. I see how this limits concurrency for tasks that perform actual work, however our use case is very close to the example that I posted, where tasks spend most time waiting (submitting work to an external service and polling rather infrequently for status) and don't consume very much CPU time. If I don't use prefect and add the task functions to my own anyio task group, all of them start executing right away.
just for completeness: an example of how a large number of waiting tasks run concurrently using the anyio task_group. from my understanding, this is about the same approach that prefect uses to run tasks, so i would expect this to scale similarly.
import time
from anyio import sleep, create_task_group, run

num_tasks = 50
active_tasks = num_tasks * [False]


async def sleep_task_async(task_id, t):
    # global variable is set to true as soon as this starts to run and reset at finish
    active_tasks[task_id] = True
    await sleep(t)
    active_tasks[task_id] = False


async def monitor_tasks_async(t):
    start = time.time()
    while time.time() - start < t:
        print(f"Active tasks (actually doing work):\t{len([x for x in active_tasks if x])}")
        await sleep(2)


async def test_task_group_concurrency():
    async with create_task_group() as tg:
        for num in range(num_tasks):
            tg.start_soon(sleep_task_async, num, 5)

        tg.start_soon(monitor_tasks_async, 10)


if __name__ == "__main__":
    run(test_task_group_concurrency)
z

Zanie

10/25/2022, 2:47 PM
👍 Thanks. I’ll be investigate this.
Hey you’re seeing a limit of ~40 active tasks yes?
j

Justin Trautmann

10/25/2022, 2:56 PM
thanks. yes, 40 seems to be somehow a magic limit, although a bit fuzzy. when running more than 40 tasks, the number of actually running tasks varies but is always <40, when running less than 40 tasks concurrently, all tasks run as expected.
z

Zanie

10/25/2022, 2:57 PM
AnyIO has a default thread capacity limit of 40
💡 1
I’m going to be exploring some more low-level fixes, but you can only have so many threads on a single process or Python will get pretty slow.
🙌 2
j

Justin Trautmann

10/25/2022, 3:20 PM
this makes very much sense💡 let's see if adapting current_default_thread_limiter is a feasible solution for our workloads. i guess it all depends on how computationally intensive our individual tasks actually are.
Hello Michael, i saw that the thread limit is mainly a problem for sync flows as the begin_run() function is directly awaited in async flows in engine.enter_task_run_engine and only executed in a worker thread if the flow is async. increasing the thread limit would add quite some overhead for context switches, so i refactored our flow to async in the hope that with this would run all tasks directly in the event loop without hitting any thread limits. this works fine for tasks without dependencies, however as soon as we have a certain number of tasks with dependencies, this runs into a deadlock. i tried to pinpoint the issue and think that engine.resolve_inputs or engine.collect_task_run_inputs are the root cause. they resolves/collects all inputs in worker threads which could lead to a situation where a child task needs to resolve some inputs but the function never starts because the thread limit is reached and no thread limiter token will ever become available because all tokens are used by parent tasks that only return and release them if the current resolution finishes. please find attached an example that triggers the described behavior. maybe this helps in the process of finding a scalable solution for the concurrent task runner. are there any other possible options other than increasing the tread limit or using the ray/dask task runner? thank you very much.
from prefect import flow, task
from anyio import run, to_thread


@task
async def dummy_task(input_task=None):
    return


@flow
async def test_one_long_task_chain():
    # to_thread.current_default_thread_limiter().total_tokens = 200 # fixes deadlock

    depth = 100
    future = await dummy_task.submit()
    for _ in range(depth):
        future = await dummy_task.submit(future)


@flow
async def test_multiple_short_task_chains():
    # to_thread.current_default_thread_limiter().total_tokens = 200  # fixes deadlock

    breadth = 100

    level_1 = [await dummy_task.submit() for _ in range(breadth)]
    level_2 = [await dummy_task.submit(level_1[i]) for i in range(breadth)]

if __name__ == "__main__":
    # both of the following calls run into a deadlock
    run(test_one_long_task_chain)
    # run(test_multiple_short_task_chains)
z

Zanie

10/27/2022, 2:31 PM
Thanks for digging into this further! You’re going the right direction, in that implementing things
async
is going to more efficient and involve less threads. I’m aware of this issue with collecting inputs in threads causing deadlocks. My next month or so of work is going to be dedicated to changing the lower level workings of the engine to fix these problems.
I’ll also be writing benchmarks so we can determine how much adjusting thread limits affects performance and such.
j

Justin Trautmann

10/27/2022, 2:54 PM
great to hear that this is being worked on. for the time being, what would you consider the best approach for our workload? we have a lot of parallel low cpu/high IO tasks that run on one CPU without any issues when using plain anyio task groups. async flows with any of the task runners will potentially run into the deadlock issues and sync flows will become very slow with large numbers of threads on one cpu. from my understanding, this would be the same for the concurrent tasks runner or local dask/ray cluster when using only one cpu. currently, the only viable option that i can see here is using ray or dask on a machine with more cores and less threads per cpu, which would scale but waste a lot of compute power. is this about the right direction or am i missing something here?
or maybe async flows with sleep statements in between submit calls in order to avoid said deadlocks?
z

Zanie

10/27/2022, 3:24 PM
I don’t have a great recommendation right now, sorry. I think the best fix is going to come from the upcoming work. Do you need full orchestration for all of these jobs?
j

Justin Trautmann

10/27/2022, 3:29 PM
what kind of partial orchestration do you have in mind?
z

Zanie

10/27/2022, 3:57 PM
Well, if you aren’t actually using our orchestration features for all the tasks you could just submit them to task groups as you’ve noted.
j

Justin Trautmann

10/27/2022, 4:16 PM
true, that would indeed be possible but much less convenient as we would need to take care of all the dependencies and correct execution order by ourselves.
z

Zanie

10/27/2022, 4:30 PM
Ah that makes sense. Well, I’m hoping to begin delivering improvements to this in the next couple weeks. Perhaps increasing the thread count is the first move? Have you actually measured context switching performance hits?