https://prefect.io logo
j

Jeff Hale

08/29/2023, 11:46 PM
Share your code from 101 in a thread here:
๐Ÿงต
Copy code
`` backtick 3 times in a row for nice formatting
s

Sophia Alice

08/29/2023, 11:56 PM
Copy code
from prefect import flow, task
import requests
import pandas as pd


@task
def get_data(lat: float, lon:float):
    url = f"<https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&hourly=temperature_2m>"
    weather = requests.get(url)
    return weather.json()

@task
def transform(data: dict):
    time = data["hourly"]["time"]
    temp = data["hourly"]["temperature_2m"]
    return {"time": time, "temp": temp}

@task
def write_data(data: dict):
    df = pd.DataFrame(data)
    df.to_csv("temp-test.csv", index=False)

@flow
def pipeline(lat, lon):
    data = get_data(lat=lat, lon=lon)
    transformed_data = transform(data)
    write_data(transformed_data)

if __name__ == "__main__":
    lat = "34.07"
    lon = "-118.26"
    pipeline(lat=lat, lon=lon)
๐Ÿ™Œ 2
blob attention gif 1
๐ŸŒก๏ธ 2
gratitude thank you 1
z

Zach Schuster

08/30/2023, 12:02 AM
Copy code
import httpx
from prefect import flow, task
import pandas as pd


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(
            latitude=lat,
            longitude=lon,
            hourly="temperature_2m,cloudcover_mid,windspeed_10m,winddirection_10m",
        ),
    )
    weather_json = weather.json()
    df = pd.DataFrame(
        {
            "latitude": weather_json["latitude"],
            "longitude": weather_json["longitude"],
            "temperature": weather.json()["hourly"]["temperature_2m"],
            "wind": weather.json()["hourly"]["windspeed_10m"],
            "cloudcover": weather.json()["hourly"]["cloudcover_mid"],
        }
    )
    return df


@task
def save_weather_data(data: pd.DataFrame):
    data.to_csv("weather_data.csv")
    return "successfully wrote file"


@task
def get_latest_value(data: pd.DataFrame, metric: str):
    print(f"latest data for {metric}: {data[metric].iloc[-1]}")


@flow
def get_weather_metrics(lat: float, lon: float):
    df = fetch_weather(lat=lat, lon=lon)
    # print out latest wind metric
    get_latest_value(df, "wind")
    save_weather_data(df)


if __name__ == "__main__":
    get_weather_metrics(38.9, -77.0)
blob attention gif 1
๐Ÿฆœ 2
l

Lucero Yanez

08/30/2023, 12:04 AM
Copy code
from prefect import flow, task
import httpx

@task
def fetch_weather(lat:float, lon:float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

@task
def save_weather(temp:float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"

@task
def get_elevation(lat:float, lon:float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
    )
    elevation = float(weather.json()["elevation"])
    return elevation


@flow
def pipeline(lat:float, lon:float):
    temp = fetch_weather(lat, lon)
    result = save_weather(temp)
    elevation = get_elevation(lat, lon)
    return result

if __name__ == "__main__":
    pipeline(52.52, 13.41)
๐Ÿฆœ 1
wizard2 1
sheepy 1
m

MarcoM

08/30/2023, 12:09 AM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task
def get_hourly_rain(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    rain = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    most_recent_rain = float(rain.json()["hourly"]["rain"][0])
    return most_recent_rain


@task
def save_weather(temp: float, rain: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp) + str(rain))
    return "Successfully wrote temp"


@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    rain = get_hourly_rain(lat, lon)
    result = save_weather(temp, rain)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
squirtle cool 1
๐Ÿ‘ 1
party gritty 1