Hi due to <https://prefect-community.slack.com/archives/C048SVCEFF0/p1700639636500989> I tried to ad...
y
Hi due to https://prefect-community.slack.com/archives/C048SVCEFF0/p1700639636500989 I tried to add concurrency limit to my async task. How do i add concurrency limit to my code? I have tried multiple ways, it does not seem to register with the concurrency limit: there is no active task run when they are running does this look right syntax wise (trying to have only 10 slots everytime)
Copy code
async def download_s3_obj(key: str, aiohttp_session: client.ClientSession, s3_client, bucket, local_path):
    request_url = s3_client.generate_presigned_url("get_object", {"Bucket": bucket, "Key": key})

    async with aiohttp_session.get(URL(request_url, encoded=True)) as response:
        async with concurrency("s3-download", occupy=10):
            file_path = local_path + "/" + key

            with open(file_path, "wb") as file:
                file.write(await response.read())

            return file_path
b
Hey Ying, there's a few ways to add concurrency to your prefect scripts. There's task tags for @task concurrency, there's work pool concurrency that you can use for limiting the number of concurrent flow runs, and you can leverage global concurrency limits as well.
y
thanks @Bianca Hoch, is there any example to use task tags on async tasks? I have tried writing it like this but it still does not register in the concurrency limit:
Copy code
@task(tags=["download-s3"])
async def download_s3(video_name: str, bucket_name: str, local_path: str, aws_key, aws_secret):
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=aws_key,
        aws_secret_access_key=aws_secret,
        config=Config(signature_version="s3v4", region_name="eu-central-1"),
    )

    aio_session = client.ClientSession()
    filepath_list = await download_s3_obj(video_name, aio_session, s3_client, bucket_name, local_path)
    await aio_session.close()

    return filepath_list