Maria
01/18/2024, 7:12 AMMaria
01/18/2024, 7:12 AMimport httpx
from prefect import flow, task
import time
from prefect.task_runners import ConcurrentTaskRunner
@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str, delay: 20):
"""Get info about a repo - will retry twice after failing"""
url = f"<https://api.github.com/repos/{repo_owner}/{repo_name}>"
api_response = httpx.get(url)
api_response.raise_for_status()
repo_info = api_response.json()
time.sleep(delay)
return repo_info
@task
def get_contributors(repo_info: dict):
"""Get contributors for a repo"""
contributors_url = repo_info["contributors_url"]
response = httpx.get(contributors_url)
response.raise_for_status()
contributors = response.json()
return contributors
@flow(log_prints=True, task_runner=ConcurrentTaskRunner)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
# I want 1 and 2 to execute concurrently
repo_info_1 = get_repo_info(repo_owner, repo_name, 60)
repo_info_2 = get_repo_info(repo_owner, repo_name, 10)
print(f"Stars 🌠: {repo_info_2['stargazers_count']}")
contributors = get_contributors(repo_info_2)
print(f"Number of contributors 👷: {len(contributors)}")
if __name__ == "__main__":
repo_info()
Maria
01/18/2024, 7:13 AMMaria
01/18/2024, 8:15 AM