Hello, I'm on latest prefect 2.10.6, and I have a ...
# ask-community
o
Hello, I'm on latest prefect 2.10.6, and I have a lot of tasks that hang indefinitely, I'm running the tasks in parallel inside a flow with a tag concurrency limit and a timeout of 60 seconds set for each task, but it still hangs indefinitely until I kill it, more details in the thread ⬇️
👀 1
1
the flow looks more or less like this:
Copy code
from prefect import flow, task
import time


# 'db' concurrency limit = 50


@task(tags=["db"], timeout_seconds=60)
def range_task(n: int) -> list[int]:
    return list(range(n))


@task(tags=["db"], timeout_seconds=60)
def pwr_task(index: int) -> int:
    time.sleep(0.5)
    return index**2


@task(tags=["db"], timeout_seconds=60)
def add_task(index: int, to_add: int) -> int:
    time.sleep(0.5)
    return index + to_add


@task(tags=["db"], timeout_seconds=60)
def final_task(pwr: int, added: int) -> int:
    time.sleep(0.5)
    return pwr + added


@flow
async def test_flow(n: int, to_adds: list[int] = [1, 2, 3]):

    indexes = range_task(n)

    pwr_futures = pwr_task.map(index=indexes)

    for to_add in to_adds:
        added_futures = add_task.map(index=pwr_futures, to_add=to_add)

        final_futures = final_task.map(pwr=pwr_futures, added=added_futures)


if __name__ == "__main__":
    import asyncio

    asyncio.run(test_flow(300))
with each task doing a query to a postgres database
When running the example above some tasks stay stuck in the
Running
state indefinitely:
My theory is that the engine doesn't handle well having 2 tasks that use the same tag concurrency limit and depend on each other 🤔
Keeping the concurrency tag on only the first task (that is
mapped
) solves the issue, but it still seems un-intuitive that tasks with the same concurrency limit tag don't work well with each other 🤔
Update: it doesn't fix the issue, it is still very flaky and sometimes flow runs are still stuck in a Running state ..
y
@Ouail Bendidi Have you tried looking for similar issues in Prefect's Discourse?
o
Most discussion in prefect discours are about flows stuck in a Pending status. I think this issue probably tackles the same problem https://github.com/PrefectHQ/prefect/issues/8982
So another update, ever since the update from prefect 2.10.1 to 2.10.6, even previous deployed flows that didn't use to hang, are hanging on a
RUNNING
state indefinitely (tasks also are stuck as RUNNING).
w
We are also seeing a ton of tasks hanging, using tasks submitted to a concurrent runner with task concurrency limits (I am testing removing concurrency limits.).
n
hi @Ouail Bendidi - if you're still running into this, do you see the same issue if you either make everything async or everything sync? historically there has been some weirdness when calling sync tasks from an async flow like that, just wondering if it might be related to what you're seeing
hi @Walter Cavinaw - can you explain more about your setup and what you're seeing?
w
sync flow calling async tasks, with a concurrentrunner. The tasks had a tag that we origninally put concurrency limits on. That worked until recently when tasks started to hang. we removed the task concurrency limit, and it was still hanging. We removed the tag from the task decorator (just now) and the tasks are not hanging and the flow is working. just from that information, it seems that the task tag parameter was to blame. But you are suggesting that the flow should be async as well for it to work?
We did have an issue with the task concurrency limit a while ago (too many errant tasks that we cancelled but still marked as running and blocking the concurrency limit) and deleted it and added a new one with the same name. It might also be that somewhere in the db/system that data hasn't cleaned up and by removing the tag parameter we are simply bypassing that stale concurrency limit data. (I am just guessing at alternative reasons)
n
But you are suggesting that the flow should be async as well for it to work?
no, not necessarily - I just know I've seen some deadlock behavior from calling sync tasks in async flows before, but I agree with your assessment in this case
it seems that the task tag parameter was to blame
hmm 🤔
o
hi @Ouail Bendidi - if you're still running into this, do you see the same issue if you either make everything async or everything sync? historically there has been some weirdness when calling sync tasks from an async flow like that, just wondering if it might be related to what you're seeing
So my default setup is an async flow with sync tasks, I've also tried with a sync flow and sync tasks and the problem still persists. On another note i reverted to prefect 2.10.1, and I've started having the "task stuck in RUNNING" way less than before (Only when the number of concurrent tasks is very large)
Not sure if it helps, but when
ctrl^C
the stuck flows, most of them were stuck trying to acquire a logging lock
sync flow calling async tasks,
we need to define what an async task is 😅 , is it a task called with
.submit
/`.map` methods or an
async def
task ?
w
in my case it was about a dozen async def task_func, called by task_func.with_options().submit()
👌 1
n
@Ouail Bendidi submit and map behave according to the sync/async nature of the decorated function, so when i say an async task i just mean any
@task
decorated function with
async def
(likewise for sync) i will try to dig into this more
👌 1
a
@Ouail Bendidi Hi! I’m trying to reproduce this locally. Using Python 3.9.16, prefect from main (2.10.8+1.gdb7bfbd35), Postgres, and running
prefect server start
locally, I’m not able to repro yet. Can you share
prefect version
?
o
The prefect version where I was having the issue is
2.10.6
(python3.11), didn't test the latest yet Also if it helps I'm running a prefect server hosted on premise on ECS with 2 vCPU and 2Gb or Memory and an RDS postgres db
Happened again with prefect 2.10.1, these are deployments that usually take less than 1 minute to run, also the flow has a 1h timeout set. They run inside a
DockerContainer
on prefect agent deployed in ECS (We'll be using ECS workers directly after the flows are more stable^^) , the prefect agent also has 2vCPU's and 2Gb of memory
a
Thank you! I'll run this again with Python 3.11 and then align my runtime environment from there until I (hopefully) repro.
o
Is there any chance that it is because of memory issues on the prefect agent/server side ? or because of zombie docker container staying in the agent ? (Doesn't explain the failures when running the flows locally while connected to the deployed prefect server)
a
@Ouail Bendidi First, a caveat — I don’t work on this code every day anymore. 🙂 That said, if a container running an agent exceeds its memory constraints and the kernel’s OOM Killer process kills it during a flow run, that might cause irregular behavior — but it depends on your agent/infrastructure or worker setup (I’ll give some details next). And note that if an OOM is happening, the container runtime should leave behind some indication, like an OOMKilled event on the pod in k8s (“last state: terminated, reason: OOMKilled” and such). If you use an agent with a deployment that uses the
Process
infrastructure type, or you use a
process
worker (instead of an agent), getting OOM-killed might leave behind data about the in-progress flow run. This can cause flow runs stuck in a Running state. This isn’t the case if flow runs run out-of-process, like in a k8s Job — in that case, the agent or worker can get OOM’d and the flow run will continue running. (Also, the agent/worker is monitoring the flow run, so if the flow run gets OOM'd, the worker will know that and record it correctly). Similarly to how flow runs might get stuck in Running state, if your flow run uses the concurrent or sequential task runner, that means tasks run in-process. So if your worker or agent runs a flow in-process, and the flow runs tasks in-process, then the worker getting OOM’d might leave task runs in a Running state. This isn’t the case if the flow run uses a task runner that distributed tasks somewhere else, out-of-process, like a Ray or Dask cluster. In that case, the tasks run separately and update Prefect about their state independently. Tasks stuck in a Running state can cause problems like Walter and you described. That’s because the conditions that can prevent us from “knowing” that the task run died (due to the OOM) mean we don’t free the in-use concurrency slot. So the task run ID can stick around in the “slots” we use to track your concurrency and apply limits. So, one theory about what’s happening to you in particular @Ouail Bendidi is that your concurrency slots are all taken with dead task run IDs. So when you run a flow, even locally, your task runs wait for a concurrency slot to open, and one never does, so nothing happens. You can verify this by running the following command in your terminal:
Copy code
prefect concurrency-limit ls
Assuming you don’t have a flow run running, if you see a non-zero count in Active Task Runs, what I just described is the problem. You can reset a concurrency limit like this:
Copy code
prefect concurrency-limit reset <tag>
⬆️ 2
Improving our ability to detect flow and task runs in this condition is one of our top priorities at the moment! We had something similar in v1 with Zombie Killer.
s
@Ouail Bendidi Hi. Check the settings of the swap file. I have a similar problem with OOM, cause of a disabled swapfile in the host where I ran DockerContainer()
o
Thanks @Sergei Filkin, Ended up switching to ECSTasks, and prefect agent is able to correctly pick up crashed/failling flows due to OOM (or other) and release tag concurrency to not block other deployments^^