Ouail Bendidi
05/04/2023, 12:36 PMfrom prefect import flow, task
import time
# 'db' concurrency limit = 50
@task(tags=["db"], timeout_seconds=60)
def range_task(n: int) -> list[int]:
return list(range(n))
@task(tags=["db"], timeout_seconds=60)
def pwr_task(index: int) -> int:
time.sleep(0.5)
return index**2
@task(tags=["db"], timeout_seconds=60)
def add_task(index: int, to_add: int) -> int:
time.sleep(0.5)
return index + to_add
@task(tags=["db"], timeout_seconds=60)
def final_task(pwr: int, added: int) -> int:
time.sleep(0.5)
return pwr + added
@flow
async def test_flow(n: int, to_adds: list[int] = [1, 2, 3]):
indexes = range_task(n)
pwr_futures = pwr_task.map(index=indexes)
for to_add in to_adds:
added_futures = add_task.map(index=pwr_futures, to_add=to_add)
final_futures = final_task.map(pwr=pwr_futures, added=added_futures)
if __name__ == "__main__":
import asyncio
asyncio.run(test_flow(300))
with each task doing a query to a postgres databaseRunning
state indefinitely:mapped
) solves the issue, but it still seems un-intuitive that tasks with the same concurrency limit tag don't work well with each other š¤Yaron Levi
05/04/2023, 3:44 PMOuail Bendidi
05/04/2023, 5:18 PMRUNNING
state indefinitely (tasks also are stuck as RUNNING).Walter Cavinaw
05/09/2023, 5:13 PMNate
05/09/2023, 5:20 PMWalter Cavinaw
05/09/2023, 5:23 PMNate
05/09/2023, 5:28 PMBut you are suggesting that the flow should be async as well for it to work?no, not necessarily - I just know I've seen some deadlock behavior from calling sync tasks in async flows before, but I agree with your assessment in this case
it seems that the task tag parameter was to blamehmm š¤
Ouail Bendidi
05/09/2023, 5:29 PMhi @Ouail Bendidi - if you're still running into this, do you see the same issue if you either make everything async or everything sync? historically there has been some weirdness when calling sync tasks from an async flow like that, just wondering if it might be related to what you're seeingSo my default setup is an async flow with sync tasks, I've also tried with a sync flow and sync tasks and the problem still persists. On another note i reverted to prefect 2.10.1, and I've started having the "task stuck in RUNNING" way less than before (Only when the number of concurrent tasks is very large)
ctrl^C
the stuck flows, most of them were stuck trying to acquire a logging locksync flow calling async tasks,we need to define what an async task is š , is it a task called with
.submit
/`.map` methods or an async def
task ?Walter Cavinaw
05/09/2023, 5:34 PMAndrew Brookins
05/10/2023, 12:38 AMprefect server start
locally, Iām not able to repro yet.
Can you share prefect version
?Ouail Bendidi
05/10/2023, 7:35 AM2.10.6
(python3.11), didn't test the latest yet
Also if it helps I'm running a prefect server hosted on premise on ECS with 2 vCPU and 2Gb or Memory and an RDS postgres dbDockerContainer
on prefect agent deployed in ECS (We'll be using ECS workers directly after the flows are more stable^^) , the prefect agent also has 2vCPU's and 2Gb of memoryAndrew Brookins
05/10/2023, 3:23 PMOuail Bendidi
05/10/2023, 3:37 PMAndrew Brookins
05/13/2023, 1:27 AMSergei Filkin
05/16/2023, 1:25 PMOuail Bendidi
05/17/2023, 9:17 AM