https://prefect.io logo
Title
j

Jeff Hale

05/16/2023, 4:22 PM
102 lab: Share your code here (thread):
s

Stéphan Taljaard

05/16/2023, 4:25 PM
import httpx
from prefect import flow, task


@task(retries=3, retry_delay_seconds=5)
def fetch_temperature(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task(retries=3, retry_delay_seconds=5)
def fetch_humidity(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
    )
    most_recent_hum = float(weather.json()["hourly"]["relativehumidity_2m"][0])
    return most_recent_hum


@flow
def convert_c_to_f(temp: float):
    return temp * 9 / 5 + 32


@task
def save_weather_data(temp: float, variable_name: str):
    with open(f"weather_{variable_name}.csv", "w+") as w:
        w.write(str(temp))
    return f"Successfully wrote {variable_name}"


@flow
def pipeline(lat: float, lon: float):
    temp_c = fetch_temperature(lat, lon)
    temp_f = convert_c_to_f(temp_c)
    print(temp_f)
    temp_result = save_weather_data(temp_f, "temperature")

    humidity = fetch_humidity(lat, lon)
    print(humidity)
    rain_result = save_weather_data(humidity, "humidity")

if __name__ == "__main__":
    pipeline(38.9, -77.0)
🙌 1
f

Francesco Bartoli

05/16/2023, 4:25 PM
Room 2
import httpx  # requests capability, but can work with async
from prefect import flow, task
from prefect import flow, get_run_logger


@task(retries=3, retry_delay_seconds=10)
def geocode(name: str):
    base_url = "<https://geocoding-api.open-meteo.com/v1/search>"
    geocoded = httpx.get(
        base_url,
        params=dict(name=name),
    )
    first_result = geocoded.json()["results"][0]
    return (first_result["latitude"], first_result["longitude"])


@task(retries=3, retry_delay_seconds=10)
def fetch_weather(lat, lon):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

@task(retries=3, retry_delay_seconds=10)
def fetch_historical_weather(start, end, lat, lon):
    base_url = "<https://archive-api.open-meteo.com/v1/archive>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, start_date=start, end_date=end, hourly="temperature_2m"),
    )
    breakpoint()
    most_recent_temp = weather.json()["hourly"]["temperature_2m"]
    return most_recent_temp

@task(retries=3, retry_delay_seconds=10)
def fetch_elevation(lat, lon):
    base_url = "<https://api.open-meteo.com/v1/elevation>"
    elevation = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon),
    )
    elev = elevation.json()["elevation"]
    return elev

@flow
def current_weather(name: str):
    logger = get_run_logger()
    geoname = geocode(name=name)
    temperature = fetch_weather(*geoname)
    elevation = fetch_elevation(*geoname)
    <http://logger.info|logger.info>(f"temperature={temperature}")
    <http://logger.info|logger.info>(f"elevation={elevation}")
    return temperature, elevation

@flow
def historical_weather(name: str, start_date: str, end_date: str):
    logger = get_run_logger()
    geoname = geocode(name=name)
    lat, lon = geoname
    temperature = fetch_historical_weather(start=start_date, end=end_date, lat=lat, lon=lon)
    <http://logger.info|logger.info>(f"historical temperature={temperature}")
    return temperature


if __name__ == "__main__":
    current_weather("Rome")
    historical_weather("Rome", "2022-11-31", "2022-12-31")
🙌 4