Jeff Hale
08/30/2023, 12:45 AMjulius
08/30/2023, 12:53 AMElizabeth Bing
08/30/2023, 1:00 AMimport httpx
from prefect import flow, task
import pandas as pd
from prefect.tasks import task_input_hash
# latitude 26.1 North, longitude 84.8 West
# cache_key determines the inputs
@task(cache_key_fn=task_input_hash, cache_expiration=datetime.timedeltas=)
def fetch_weather(lat: float, lon: float):
base_url = "https://api.open-meteo.com/v1/forecast/"
weather = httpx.get(
base_url,
params=dict(
latitude=lat,
longitude=lon,
hourly="temperature_2m,cloudcover_mid,windspeed_10m,winddirection_10m",
),
)
weather_json = weather.json()
df = pd.DataFrame(
{
"latitude": weather_json["latitude"],
"longitude": weather_json["longitude"],
"temperature": weather.json()["hourly"]["temperature_2m"],
"wind": weather.json()["hourly"]["windspeed_10m"],
"cloudcover": weather.json()["hourly"]["cloudcover_mid"],
}
)
return df
@task
def save_weather_data(data: pd.DataFrame):
data.to_csv("weather_data.csv")
return "successfully wrote file"
@task
def get_latest_value(data: pd.DataFrame, metric: str):
print(f"latest data for {metric} {data[metric].iloc[-1]}")
@flow(retries=4)
def get_weather_metrics(lat: float, lon: float):
df = fetch_weather(lat=lat, lon=lon)
# print out latest wind metric
get_latest_value(df, "wind")
save_weather_data(df)
if name == "__main__":
get_weather_metrics(38.9, -77.0)Lucero Yanez
08/30/2023, 1:01 AMfrom prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
import httpx
@task(cache_key_fn=task_input_hash, retries=3)
def fetch_weather(lat:float, lon:float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task(cache_key_fn=task_input_hash, retries=2, persist_result=True)
def save_weather(temp:float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task(cache_key_fn=task_input_hash, retries=3, persist_result=True)
def get_elevation(lat:float, lon:float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
)
elevation = float(weather.json()["elevation"])
return elevation
@flow(log_prints=True)
def pipeline(lat:float, lon:float):
temp = fetch_weather(lat, lon)
result = save_weather(temp)
elevation = get_elevation(lat, lon)
print("The elevation is: ",elevation)
return result
if __name__ == "__main__":
pipeline(52.52, 13.41)
Anthony Kippa
08/30/2023, 1:04 AMimport httpx
from prefect import task, flow
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, retries=2, retry_delay_seconds=0.5)
def get_hourly_weather_temparature(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task(retries=2, retry_delay_seconds=0.5)
def get_hourly_rain(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
rain = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
most_recent_rain = float(rain.json()["hourly"]["rain"][0])
return most_recent_rain
@task(retries=2, retry_delay_seconds=0.5)
def save_weather(temp: float, rain: float):
with open("weather.csv", "w+") as w:
temp = "Temparature:{}\n".format(temp)
rain = "Rainfall:{}\n".format(rain)
w.writelines([temp,rain])
return "Successfully wrote temp and rainfall"
@flow
def pipeline(lat: float, lon: float):
temp = get_hourly_weather_temparature(lat, lon)
rain = get_hourly_rain(lat, lon)
result = save_weather(temp, rain)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)