https://prefect.io logo
j

Jeff Hale

07/06/2023, 2:40 PM
Share Module 102 code here (thread):
a

Alyssa Harris

07/06/2023, 2:42 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash

@task
def fetch_air_quality(lat: float, lon: float, item: str):
    base_url = "<https://air-quality-api.open-meteo.com/v1/air-quality/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=item),
    )
    print(weather)
    most_recent_air_q = float(weather.json()["hourly"][item][0])
    return most_recent_air_q


@task(retries=2, retry_delay_seconds=0.1, cache_key_fn=task_input_hash)
def fetch_weather_item(lat: float, lon: float, item: str):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=item),
    )
    most_recent_item = float(weather.json()["hourly"][item][0])
    return most_recent_item


@task
def save_weather(item: float):
    with open("weather.csv", "a") as f:
        f.write(str(item))
    return "Successfully wrote weather item"


@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather_item(lat, lon, "temperature_2m")
    # not writing out precip yet
    precip = fetch_weather_item(lat, lon, "precipitation")
    air_q = fetch_air_quality(lat, lon, "pm10")
    result = save_weather(temp)
    result += save_weather(precip)
    result += save_weather(air_q)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
👍 1
wizard2 1
🙏 1