https://prefect.io logo
Title
o

Ouail Bendidi

05/04/2023, 12:36 PM
Hello, I'm on latest prefect 2.10.6, and I have a lot of tasks that hang indefinitely, I'm running the tasks in parallel inside a flow with a tag concurrency limit and a timeout of 60 seconds set for each task, but it still hangs indefinitely until I kill it, more details in the thread ā¬‡ļø
šŸ‘€ 1
the flow looks more or less like this:
from prefect import flow, task
import time


# 'db' concurrency limit = 50


@task(tags=["db"], timeout_seconds=60)
def range_task(n: int) -> list[int]:
    return list(range(n))


@task(tags=["db"], timeout_seconds=60)
def pwr_task(index: int) -> int:
    time.sleep(0.5)
    return index**2


@task(tags=["db"], timeout_seconds=60)
def add_task(index: int, to_add: int) -> int:
    time.sleep(0.5)
    return index + to_add


@task(tags=["db"], timeout_seconds=60)
def final_task(pwr: int, added: int) -> int:
    time.sleep(0.5)
    return pwr + added


@flow
async def test_flow(n: int, to_adds: list[int] = [1, 2, 3]):

    indexes = range_task(n)

    pwr_futures = pwr_task.map(index=indexes)

    for to_add in to_adds:
        added_futures = add_task.map(index=pwr_futures, to_add=to_add)

        final_futures = final_task.map(pwr=pwr_futures, added=added_futures)


if __name__ == "__main__":
    import asyncio

    asyncio.run(test_flow(300))
with each task doing a query to a postgres database
When running the example above some tasks stay stuck in the
Running
state indefinitely:
My theory is that the engine doesn't handle well having 2 tasks that use the same tag concurrency limit and depend on each other šŸ¤”
Keeping the concurrency tag on only the first task (that is
mapped
) solves the issue, but it still seems un-intuitive that tasks with the same concurrency limit tag don't work well with each other šŸ¤”
Update: it doesn't fix the issue, it is still very flaky and sometimes flow runs are still stuck in a Running state ..
y

Yaron Levi

05/04/2023, 3:44 PM
@Ouail Bendidi Have you tried looking for similar issues in Prefect's Discourse?
o

Ouail Bendidi

05/04/2023, 5:18 PM
Most discussion in prefect discours are about flows stuck in a Pending status. I think this issue probably tackles the same problem https://github.com/PrefectHQ/prefect/issues/8982
So another update, ever since the update from prefect 2.10.1 to 2.10.6, even previous deployed flows that didn't use to hang, are hanging on a
RUNNING
state indefinitely (tasks also are stuck as RUNNING).
w

Walter Cavinaw

05/09/2023, 5:13 PM
We are also seeing a ton of tasks hanging, using tasks submitted to a concurrent runner with task concurrency limits (I am testing removing concurrency limits.).
n

Nate

05/09/2023, 5:20 PM
hi @Ouail Bendidi - if you're still running into this, do you see the same issue if you either make everything async or everything sync? historically there has been some weirdness when calling sync tasks from an async flow like that, just wondering if it might be related to what you're seeing
hi @Walter Cavinaw - can you explain more about your setup and what you're seeing?
w

Walter Cavinaw

05/09/2023, 5:23 PM
sync flow calling async tasks, with a concurrentrunner. The tasks had a tag that we origninally put concurrency limits on. That worked until recently when tasks started to hang. we removed the task concurrency limit, and it was still hanging. We removed the tag from the task decorator (just now) and the tasks are not hanging and the flow is working. just from that information, it seems that the task tag parameter was to blame. But you are suggesting that the flow should be async as well for it to work?
We did have an issue with the task concurrency limit a while ago (too many errant tasks that we cancelled but still marked as running and blocking the concurrency limit) and deleted it and added a new one with the same name. It might also be that somewhere in the db/system that data hasn't cleaned up and by removing the tag parameter we are simply bypassing that stale concurrency limit data. (I am just guessing at alternative reasons)
n

Nate

05/09/2023, 5:28 PM
But you are suggesting that the flow should be async as well for it to work?
no, not necessarily - I just know I've seen some deadlock behavior from calling sync tasks in async flows before, but I agree with your assessment in this case
it seems that the task tag parameter was to blame
hmm šŸ¤”
o

Ouail Bendidi

05/09/2023, 5:29 PM
hi @Ouail Bendidi - if you're still running into this, do you see the same issue if you either make everything async or everything sync? historically there has been some weirdness when calling sync tasks from an async flow like that, just wondering if it might be related to what you're seeing
So my default setup is an async flow with sync tasks, I've also tried with a sync flow and sync tasks and the problem still persists. On another note i reverted to prefect 2.10.1, and I've started having the "task stuck in RUNNING" way less than before (Only when the number of concurrent tasks is very large)
Not sure if it helps, but when
ctrl^C
the stuck flows, most of them were stuck trying to acquire a logging lock
sync flow calling async tasks,
we need to define what an async task is šŸ˜… , is it a task called with
.submit
/`.map` methods or an
async def
task ?
w

Walter Cavinaw

05/09/2023, 5:34 PM
in my case it was about a dozen async def task_func, called by task_func.with_options().submit()
šŸ‘Œ 1
n

Nate

05/09/2023, 5:39 PM
@Ouail Bendidi submit and map behave according to the sync/async nature of the decorated function, so when i say an async task i just mean any
@task
decorated function with
async def
(likewise for sync) i will try to dig into this more
šŸ‘Œ 1
a

Andrew Brookins

05/10/2023, 12:38 AM
@Ouail Bendidi Hi! Iā€™m trying to reproduce this locally. Using Python 3.9.16, prefect from main (2.10.8+1.gdb7bfbd35), Postgres, and running
prefect server start
locally, Iā€™m not able to repro yet. Can you share
prefect version
?
o

Ouail Bendidi

05/10/2023, 7:35 AM
The prefect version where I was having the issue is
2.10.6
(python3.11), didn't test the latest yet Also if it helps I'm running a prefect server hosted on premise on ECS with 2 vCPU and 2Gb or Memory and an RDS postgres db
Happened again with prefect 2.10.1, these are deployments that usually take less than 1 minute to run, also the flow has a 1h timeout set. They run inside a
DockerContainer
on prefect agent deployed in ECS (We'll be using ECS workers directly after the flows are more stable^^) , the prefect agent also has 2vCPU's and 2Gb of memory
a

Andrew Brookins

05/10/2023, 3:23 PM
Thank you! I'll run this again with Python 3.11 and then align my runtime environment from there until I (hopefully) repro.
o

Ouail Bendidi

05/10/2023, 3:37 PM
Is there any chance that it is because of memory issues on the prefect agent/server side ? or because of zombie docker container staying in the agent ? (Doesn't explain the failures when running the flows locally while connected to the deployed prefect server)
a

Andrew Brookins

05/13/2023, 1:27 AM
Improving our ability to detect flow and task runs in this condition is one of our top priorities at the moment! We had something similar in v1 with Zombie Killer.
s

Sergei Filkin

05/16/2023, 1:25 PM
@Ouail Bendidi Hi. Check the settings of the swap file. I have a similar problem with OOM, cause of a disabled swapfile in the host where I ran DockerContainer()
o

Ouail Bendidi

05/17/2023, 9:17 AM
Thanks @Sergei Filkin, Ended up switching to ECSTasks, and prefect agent is able to correctly pick up crashed/failling flows due to OOM (or other) and release tag concurrency to not block other deployments^^