Share your code from Module 101 in a :thread: here...
# pacc-london-2023
j
Share your code from Module 101 in a ๐Ÿงต here:
n
Copy code
import httpx
from prefect import flow, task

@task
def get_temperature(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@task
def get_rain(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    rain_status = float(weather.json()["hourly"]["rain"][0])
    print(f"Rain status: {rain_status}")
    return rain_status

@task
def get_visibility(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="visibility"),
    )
    visibility_status = float(weather.json()["hourly"]["visibility"][0])
    print(f"Visibility status: {visibility_status}")
    return visibility_status

@flow
def fetch_weather_metrics(lat: float, lon: float):
    get_temperature(lat=lat, lon=lon)
    get_rain(lat=lat, lon=lon)
    get_visibility(lat=lat, lon=lon)


if __name__ == "__main__":
    fetch_weather_metrics(44.80401, 20.46513)
๐ŸŽ‰ 2
s
Copy code
import httpx
from prefect import task, flow
import csv
import datetime

LOCATIONS = {
    "sheffield": (53.4, -1.47),
    "paris": (48.9, 2.35),
    "london": (51.5, -0.12),
}

@task
def fetch_rain(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    most_recent_rain = float(weather.json()["hourly"]["rain"][0])
    return most_recent_rain

@task
def fetch_cloud(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="cloudcover"),
    )
    most_recent_cloudcover = float(weather.json()["hourly"]["cloudcover"][0])
    return most_recent_cloudcover

@task
def tell_me_its_crap(loc: str):
    with open("weather.csv", "w+") as w:
        writer = csv.writer(w)
        writer.writerow([loc, datetime.datetime.now()])
    print(f"it's crap weather in {loc}")

@flow
def pipeline(loc: str, lat: float, lon: float):
    rain = fetch_rain(lat, lon)
    cloud = fetch_cloud(lat, lon)
    if rain > 1 or cloud > 90:
        tell_me_its_crap(loc)


if __name__ == "__main__":
    for k, v in LOCATIONS.items():
        pipeline(k, v[0], v[1])
๐Ÿ™Œ 2
a
Copy code
import httpx
from prefect import flow, task


@task
def fetch_temp(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

@task
def fetch_pp(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="precipitation_probability"),
    )
    most_recent_pp = float(weather.json()["hourly"]["precipitation_probability"][0])
    return most_recent_pp

@task
def fetch_humidity(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
    )
    most_recent_humidity = float(weather.json()["hourly"]["relativehumidity_2m"][0])
    return most_recent_humidity

@task
def print_weather_report(temp, humidity, precipitation):
    print(f"Current temp: {temp}")
    print(f"Current humidity: {humidity}")
    print(f"Current precipitation probability: {precipitation}")

@flow
def pipeline(lat: float, lon: float):
    temp = fetch_temp(lat, lon)
    humidity = fetch_humidity(lat, lon)
    precipitation = fetch_pp(lat, lon)
    print_weather_report(temp, humidity, precipitation)


if __name__ == "__main__":
    pipeline(32.72, -117.16)
๐Ÿฆœ 2
g
Copy code
import httpx
from datetime import datetime, timedelta

from prefect import flow, task

WEATHER_MEASURES = "temperature_2m,relativehumidity_2m,rain,windspeed_10m"


@task
def fetch_hourly_weather(lat: float, lon: float):
    """Query the open-meteo API for the forecast"""
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(
            latitude=lat,
            longitude=lon,
            hourly=WEATHER_MEASURES,
        ),
    )
    return weather.json()["hourly"]


@task
def save_weather(weather: dict):
    """Save the weather data to a csv file"""
    # csv headers
    contents = f"time,{WEATHER_MEASURES}\n"
    # csv contents
    try:
        for i in range(len(weather["time"])):
            contents += weather["time"][i]
            for measure in WEATHER_MEASURES.split(","):
                contents += "," + str(weather[measure][i])

            contents += "\n"
        with open("weather.csv", "w+") as w:
            w.write(contents)
        return "Successfully wrote csv"
    except Exception as e:
        return f"Failed to write csv: {e}"


@task
def log_result(weather: str, lat: float, lon: float):
    """Create a markdown file with the results of the next hour forecast"""
    log = f"# Weather in {lat}, {lon}\n\nThe forecast for the next hour as of {datetime.now()} is...\n\n"
    # Find the next hour
    try:
        next_hour = weather["time"].index(
            (datetime.now() + timedelta(hours=1)).strftime("%Y-%m-%dT%H:00")
        )
    except ValueError:
        # Default to the first hour
        next_hour = 0
    # Log the results
    for measure in WEATHER_MEASURES.split(","):
        log += f"- {measure}: {weather[measure][next_hour]}\n"

    # Save the data
    with open("most_recent_results.md", "w") as f:
        f.write(log)


@flow
def pipeline(lat: float, lon: float):
    """Main pipeline"""
    weather = fetch_hourly_weather(lat, lon)
    result = save_weather(weather)
    log_result(weather, lat, lon)
    return result


if __name__ == "__main__":
    pipeline(50.7913957952127, -1.9014254856972352)
๐Ÿฆœ 1