Hi! I have a simple script that uses asyncio to ru...
# ask-community
r
Hi! I have a simple script that uses asyncio to run tasks concurrently with a limit set using semaphore. The script is below. When running the script using the prefect declarators @flow and @task the script fails with: Error encountered when computing cache key - result will not be persisted. However, it works as expected without the prefect declarators. What am I doing wrong? Script:
Copy code
import asyncio
from prefect import flow, task, get_run_logger
from prefect.concurrency.sync import concurrency
from random import randint, randrange

@task
async def my_coroutine(sem):
    async with sem:
        # critical section of code
        sleeptime = randint(5,15)
        print(f"Acquired - {sleeptime} sec")
        await asyncio.sleep(sleeptime)
        print(f"Released - {sleeptime} sec")

@flow
async def main():
    sem = asyncio.Semaphore(3)
    tasks = [asyncio.create_task(my_coroutine(sem)) for _ in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())
r
It looks like Prefect is trying to create a unique cache key for your tasks based on its inputs, but it can't calculate a cache key from an asyncio Semaphore. A quick option to make it work if you don't care about caching or result persistence might be setting a cache policy for your task, like:
Copy code
from prefect.cache_policies import NO_CACHE

@task(cache_policy=NO_CACHE)
async def my_coroutine(sem):
    #rest of your code
Another option is to let Prefect manage the maximum concurrency by giving your task a tag and then seting a concurrency limit on the tag. You'd use the
tags
arg in the task decorator to add a tag, something like:
Copy code
@task(tags=["my_async_task"])
async def my_important_task():
    #rest of your code
There are a few ways you can set the concurrency limit on a tag: https://docs.prefect.io/v3/develop/task-run-limits In this case, you could run something like this in your terminal:
Copy code
prefect concurrency-limit create my_async_task 3
and then alter your flow and task to get rid of the manual concurrency handling. This might be a more "Prefecty" way to do it - let the orchestrator handle the complexity of concurrency management so your code doesn't need to deal with it.
upvote 1
👍 2