Jeff Hale
08/29/2023, 11:46 PM`` backtick 3 times in a row for nice formatting
Sophia Alice
08/29/2023, 11:56 PMfrom prefect import flow, task
import requests
import pandas as pd
@task
def get_data(lat: float, lon:float):
url = f"<https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&hourly=temperature_2m>"
weather = requests.get(url)
return weather.json()
@task
def transform(data: dict):
time = data["hourly"]["time"]
temp = data["hourly"]["temperature_2m"]
return {"time": time, "temp": temp}
@task
def write_data(data: dict):
df = pd.DataFrame(data)
df.to_csv("temp-test.csv", index=False)
@flow
def pipeline(lat, lon):
data = get_data(lat=lat, lon=lon)
transformed_data = transform(data)
write_data(transformed_data)
if __name__ == "__main__":
lat = "34.07"
lon = "-118.26"
pipeline(lat=lat, lon=lon)
Zach Schuster
08/30/2023, 12:02 AMimport httpx
from prefect import flow, task
import pandas as pd
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(
latitude=lat,
longitude=lon,
hourly="temperature_2m,cloudcover_mid,windspeed_10m,winddirection_10m",
),
)
weather_json = weather.json()
df = pd.DataFrame(
{
"latitude": weather_json["latitude"],
"longitude": weather_json["longitude"],
"temperature": weather.json()["hourly"]["temperature_2m"],
"wind": weather.json()["hourly"]["windspeed_10m"],
"cloudcover": weather.json()["hourly"]["cloudcover_mid"],
}
)
return df
@task
def save_weather_data(data: pd.DataFrame):
data.to_csv("weather_data.csv")
return "successfully wrote file"
@task
def get_latest_value(data: pd.DataFrame, metric: str):
print(f"latest data for {metric}: {data[metric].iloc[-1]}")
@flow
def get_weather_metrics(lat: float, lon: float):
df = fetch_weather(lat=lat, lon=lon)
# print out latest wind metric
get_latest_value(df, "wind")
save_weather_data(df)
if __name__ == "__main__":
get_weather_metrics(38.9, -77.0)
Lucero Yanez
08/30/2023, 12:04 AMfrom prefect import flow, task
import httpx
@task
def fetch_weather(lat:float, lon:float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def save_weather(temp:float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def get_elevation(lat:float, lon:float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
)
elevation = float(weather.json()["elevation"])
return elevation
@flow
def pipeline(lat:float, lon:float):
temp = fetch_weather(lat, lon)
result = save_weather(temp)
elevation = get_elevation(lat, lon)
return result
if __name__ == "__main__":
pipeline(52.52, 13.41)
MarcoM
08/30/2023, 12:09 AMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def get_hourly_rain(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
rain = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
most_recent_rain = float(rain.json()["hourly"]["rain"][0])
return most_recent_rain
@task
def save_weather(temp: float, rain: float):
with open("weather.csv", "w+") as w:
w.write(str(temp) + str(rain))
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
rain = get_hourly_rain(lat, lon)
result = save_weather(temp, rain)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)