https://prefect.io logo
j

Jeff Hale

09/13/2023, 1:47 PM
Share code for 101 lab here (thread):
๐Ÿงต 1
Here. Tip: Three backticks makes code formatted blocks:
Copy code
I'm in a code block.
Cool.
b

Brian Newman

09/13/2023, 1:53 PM
Copy code
@task(log_prints=True)
def fetch_weather(lat: float = 38.9, lon: float = -77.0):
    """Fetches weather data from Open Meteo API for a given lat/lon"""
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    params = {
        "latitude": lat,
        "longitude": lon,
        "hourly": "temperature_2m,relativehumidity_2m,precipitation",
    }

    try:
        weather = httpx.get(base_url, params=params)
        weather_data = weather.json()
        most_recent_temp = float(weather_data["hourly"]["temperature_2m"][0])
        most_recent_rel_hum = float(weather_data["hourly"]["relativehumidity_2m"][0])
        most_recent_precip = float(weather_data["hourly"]["precipitation"][0])

        return most_recent_temp, most_recent_rel_hum, most_recent_precip
    except Exception as e:
        print(f"An error occurred: {e}")
        return None


@task(log_prints=True)
def print_weather(
    most_recent_temp: float, most_recent_rel_hum: float, most_recent_precip: float
):
    """Prints weather data to the console"""
    print(f"Most recent temp C: {most_recent_temp} degrees")
    print(f"Most recent temp F: {most_recent_temp * 9 / 5 + 32} degrees")
    print(f"Most recent relative humidity: {most_recent_rel_hum}%")
    print(f"Most recent precipitation: {most_recent_precip} mm")


@task(log_prints=True)
def save_weather(
    most_recent_temp: float, most_recent_rel_hum: float, most_recent_precip: float
):
    """Saves weather data to a CSV file"""
    try:
        with open("weather.csv", "w+") as w:
            w.write(f"Most recent temp C: {most_recent_temp} degrees\n")
            w.write(f"Most recent temp F: {most_recent_temp * 9 / 5 + 32} degrees\n")
            w.write(f"Most recent relative humidity: {most_recent_rel_hum}%\n")
            w.write(f"Most recent precipitation: {most_recent_precip} mm\n")
        return "Successfully wrote temp"
    except Exception as e:
        return f"An error occurred: {e}"


@flow(log_prints=True)
def prefect_pipeline():
    """A Prefect flow that fetches, prints, and saves weather data"""
    most_recent_temp, most_recent_rel_hum, most_recent_precip = fetch_weather()
    print_weather(most_recent_temp, most_recent_rel_hum, most_recent_precip)
    save_weather(most_recent_temp, most_recent_rel_hum, most_recent_precip)


if __name__ == "__main__":
    prefect_pipeline()
๐Ÿ™Œ 1
l

Lennert Van de Velde

09/13/2023, 2:02 PM
Copy code
import httpx
from prefect import flow, get_run_logger, task


@task
def get_temperature_2m(base_url: str, lat: float, lon: float):
    logger = get_run_logger()
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    <http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@task
def get_temperature_180m(base_url: str, lat: float, lon: float):
    logger = get_run_logger()
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_180m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_180m"][0])
    <http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@task
def get_difference_in_temp(temp_2m: float, temp_180m: float):
    logger = get_run_logger()
    difference = temp_2m - temp_180m
    <http://logger.info|logger.info>(f"Difference in temp: {difference} degrees")
    return difference

@flow()
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    two_m_temp = get_temperature_2m(base_url, lat, lon)
    one_eighty_m_temp = get_temperature_180m(base_url, lat, lon)
    get_difference_in_temp(two_m_temp, one_eighty_m_temp)
    


if __name__ == "__main__":
    fetch_weather.serve(name="depl-101-weather-app")
๐Ÿ™Œ 3
d

David Maxson

09/13/2023, 2:09 PM
Copy code
import csv
import httpx

from prefect import flow, task

METEO_URL = "<https://api.open-meteo.com/v1/forecast/>"


@task
def fetch_metric(lat: float, lon: float, metric_rate: str, metric_name: str):
    weather = httpx.get(
        METEO_URL,
        params=dict(latitude=lat, longitude=lon, **{metric_rate: metric_name}),
    )
    most_recent_temp = float(weather.json()[metric_rate][metric_name][0])
    return most_recent_temp


@task
def save_metrics(filename: str, lat: float, lon: float, metric_results: list[tuple[str, str, float]]):
    with open(filename, 'w') as f:
        writer = csv.DictWriter(f, fieldnames=['lat', 'lon', 'metric_rate', 'metric_name', 'value'])
        writer.writeheader()
        for metric_rate, metric_name, value in metric_results:
            writer.writerow(dict(lat=lat, lon=lon, metric_rate=metric_rate, metric_name=metric_name, value=value))


@flow
def open_meteo_dataflow(lat: float, lon: float, metrics: list[tuple[str, str]] = (
    ('hourly', 'apparent_temperature'),
), filename: str = 'lab1.csv'):
    metric_values = [fetch_metric(lat, lon, metric_rate, metric_name) for metric_rate, metric_name in metrics]
    save_metrics(filename, lat, lon, [(r, n, v) for ((r, n), v) in zip(metrics, metric_values)])


if __name__ == '__main__':
    open_meteo_dataflow.serve("lab-1")
๐Ÿฆœ 1
j

Jonas

09/13/2023, 2:09 PM
Copy code
import httpx
from prefect import Flow, Task

@Flow
def fetch_weather(lat: float, lon: float):
    some_temp = get_temperature(lat, lon)
    print(f'Some temperature = {some_temp}')
    some_windspeed = get_windspeed(lat, lon)
    return sum(some_temp, some_windspeed)

@Task
def get_temperature(lat, lon): 
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    # print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@Task
def get_windspeed(lat, lon): 
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    # print(weather.json())
    most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
    # print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_windspeed
@Task 
def sum(a,b): 
    return a+b

if __name__ == "__main__":
    fetch_weather.serve(name = 'test-flow')
    # fetch_weather(30,60)
    # (38.9, -77.0)
๐Ÿ‘ 1
๐Ÿฆœ 1
s

Sonny Nguyen

09/13/2023, 2:12 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task(log_prints=True)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m", "windspeed_10m", "rain"]),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    most_recent_wind_speed = float(weather.json()["hourly"]["windspeed_10m"][0])
    most_recent_rain= float(weather.json()["hourly"]["rain"][0])
    return most_recent_temp, most_recent_wind_speed, most_recent_rain


@task(log_prints=True)
def save_weather(temp: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


@flow(log_prints=True)
def pipeline(lat=38.9, lon=-77.0):
    temp, wind, rain = fetch_weather(lat, lon)
    temperature = save_weather(temp)
    wind = save_weather(wind)
    rain = save_weather(rain)
    return temperature, wind, rain


if __name__ == "__main__":
    pipeline.serve(name="weather-flow")
๐Ÿ™Œ 1