https://prefect.io logo
g

Gene

08/04/2023, 12:55 AM
I'm trying to limit concurrency at the task level to keep from hitting an api at a rate that exceeds their limit. I'm invoking the task with .sumbit(), and then calling .result() - and I have the task tagged and used the command line for setting the concurrency limit, but when I run the flow the concurrency limit is not enforced. Am I missing something? Is there a simple example somewhere that demonstrates this common scenario?
For example, I'm not able to get this example to demonstrate the concurrency limit.
Copy code
import httpx
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash


@task(tags=['mytest'], cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_url(url: str, params: dict = None):
    response = httpx.get(url, params=params)
    response.raise_for_status()
    return response.json()


def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    issues = []
    pages = range(1, -(open_issues_count // -per_page) + 1)
    for page in pages:
        issues.append(
            get_url.submit(
                f"<https://api.github.com/repos/{repo_name}/issues>",
                params={"page": page, "per_page": per_page, "state": "open"},
            )
        )
    return [i for p in issues for i in p.result()]


@flow(retries=3, retry_delay_seconds=5)
def get_repo_info(
    repo_name: str = "PrefectHQ/prefect"
):
    repo = get_url(f"<https://api.github.com/repos/{repo_name}>")
    issues = get_open_issues(repo_name, repo["open_issues_count"])
    issues_per_user = len(issues) / len(set([i["user"]["id"] for i in issues]))
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"PrefectHQ/prefect repository statistics 🤓:")
    <http://logger.info|logger.info>(f"Stars 🌠 : {repo['stargazers_count']}")
    <http://logger.info|logger.info>(f"Forks 🍴 : {repo['forks_count']}")
    <http://logger.info|logger.info>(f"Average open issues per user 💌 : {issues_per_user:.2f}")


if __name__ == "__main__":
    get_repo_info()
on the command line I set the limit with:
prefect concurrency-limit create mytest 2
then I run the flow with:
python3 ./templates/mytest.py