Jeff Hale
06/21/2023, 9:47 AMNemanja T
06/21/2023, 9:56 AMimport httpx
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
@task
def get_temperature(lat: float, lon: float):
logger = get_run_logger()
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
<http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task(retries=2, retry_delay_seconds=5)
def get_rain(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
rain_status = float(weather.json()["hourly"]["rain"][0])
print(f"Rain status: {rain_status}")
return rain_status
@task(retries=2, retry_delay_seconds=5, cache_key_fn=task_input_hash, cache_expiration=timedelta(seconds=20))
def get_visibility(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="visibility"),
)
visibility_status = float(weather.json()["hourly"]["visibility"][0])
print(f"Visibility status: {visibility_status}")
return visibility_status
@flow(log_prints=True)
def fetch_weather_metrics(lat: float, lon: float):
get_temperature(lat=lat, lon=lon)
get_rain(lat=lat, lon=lon)
get_visibility(lat=lat, lon=lon)
if __name__ == "__main__":
fetch_weather_metrics(44.80401, 21.46513)
Gemma
06/21/2023, 10:01 AMimport httpx
from datetime import datetime, timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
WEATHER_MEASURES = "temperature_2m,relativehumidity_2m,rain,windspeed_10m"
@task(name="Fetch weather", retries=3, retry_delay_seconds=1)
def fetch_hourly_weather(lat: float, lon: float):
"""Query the open-meteo API for the forecast"""
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(
latitude=lat,
longitude=lon,
hourly=WEATHER_MEASURES,
),
)
# Raise an exception if the request failed
if weather.status_code != 200:
raise Exception(f"Failed to fetch weather: {weather.text}")
return weather.json()["hourly"]
@task(
name="Save weather",
persist_result=True,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=1),
)
def save_weather(weather: dict):
"""Save the weather data to a csv file"""
logger = get_run_logger()
# csv headers
contents = f"time,{WEATHER_MEASURES}\n"
# csv contents
try:
logger.debug("Parsing JSON contents to csv")
for i in range(len(weather["time"])):
contents += weather["time"][i]
for measure in WEATHER_MEASURES.split(","):
contents += "," + str(weather[measure][i])
contents += "\n"
logger.debug("Writing csv")
with open("weather.csv", "w+") as w:
w.write(contents)
return "Successfully wrote csv"
except Exception as e:
return f"Failed to write csv: {e}"
@task(
name="Log next forecast",
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def log_forecast(weather: str, lat: float, lon: float):
"""Create a markdown file with the results of the next hour forecast"""
log = f"# Weather in {lat}, {lon}\n\nThe forecast for the next hour as of {datetime.now()} is...\n\n"
# Find the next hour
try:
next_hour = weather["time"].index(
(datetime.now() + timedelta(hours=1)).strftime("%Y-%m-%dT%H:00")
)
except ValueError:
# Default to the first hour
next_hour = 0
# Log the results
for measure in WEATHER_MEASURES.split(","):
log += f"- {measure}: {weather[measure][next_hour]}\n"
# Save the data
with open("most_recent_results.md", "w") as f:
f.write(log)
@flow
def pipeline(lat: float, lon: float):
"""Main pipeline"""
weather = fetch_hourly_weather(lat, lon)
result = save_weather(weather)
log_forecast(weather, lat, lon)
return result
if __name__ == "__main__":
pipeline(50.7913957952127, -1.9014254856972352)
Cong CHen
06/21/2023, 10:05 AM# Core code borrowed from Simon
import httpx
import pandas as pd
from prefect import task, flow
from prefect.tasks import task_input_hash
import csv
import random
LOCATIONS = {
"sheffield": (53.4, -1.47),
"paris": (48.9, 2.35),
"london": (51.5, -0.12),
}
@task(retries = 5, cache_key_fn=task_input_hash)
def fetch_rain_may_fail(lat: float, lon: float, current_minute):
"""New data is retrieved every new calendar minute."""
assert(random.randint(0, 99) < 40)
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
most_recent_rain = float(weather.json()["hourly"]["rain"][0])
return most_recent_rain
@task
def current_minute():
return pd.Timestamp.now().floor('60S')
@flow(retries = 2)
def rain_state(locs):
print({key: fetch_rain_may_fail(value[0], value[1], current_minute()) for key, value in locs.items()})
rain_state(LOCATIONS)