Tzahi Ben Arzi
11/30/2023, 8:49 PMwith concurrency("database", occupy=1)
and i am excpecting other flow runs arriving to this with to be paused after the 1st one entered, but it isn’t, can you help?
Full code:
import httpx
from prefect import flow, task
from prefect.concurrency.sync import concurrency
import time
@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
"""Get info about a repo - will retry twice after failing"""
url = f"<https://api.github.com/repos/{repo_owner}/{repo_name}>"
api_response = httpx.get(url)
api_response.raise_for_status()
repo_info = api_response.json()
return repo_info
@task
def get_contributors(repo_info: dict):
with concurrency("database", occupy=1):
time.sleep(10)
contributors_url = repo_info["contributors_url"]
response = httpx.get(contributors_url)
response.raise_for_status()
contributors = response.json()
return contributors
@flow(name="Repo Info", log_prints=True)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
"""
Given a GitHub repository, logs the number of stargazers
and contributors for that repo.
"""
repo_info = get_repo_info(repo_owner, repo_name)
print(f"Stars 🌠 : {repo_info['stargazers_count']}")
contributors = get_contributors(repo_info)
print(f"Number of contributors 👷: {len(contributors)}")
if __name__ == "__main__":
# create your first deployment
repo_info.serve(name="my-first-deployment")