Jeff Hale
09/13/2023, 1:47 PMI'm in a code block.
Cool.
Brian Newman
09/13/2023, 1:53 PM@task(log_prints=True)
def fetch_weather(lat: float = 38.9, lon: float = -77.0):
"""Fetches weather data from Open Meteo API for a given lat/lon"""
base_url = "<https://api.open-meteo.com/v1/forecast/>"
params = {
"latitude": lat,
"longitude": lon,
"hourly": "temperature_2m,relativehumidity_2m,precipitation",
}
try:
weather = httpx.get(base_url, params=params)
weather_data = weather.json()
most_recent_temp = float(weather_data["hourly"]["temperature_2m"][0])
most_recent_rel_hum = float(weather_data["hourly"]["relativehumidity_2m"][0])
most_recent_precip = float(weather_data["hourly"]["precipitation"][0])
return most_recent_temp, most_recent_rel_hum, most_recent_precip
except Exception as e:
print(f"An error occurred: {e}")
return None
@task(log_prints=True)
def print_weather(
most_recent_temp: float, most_recent_rel_hum: float, most_recent_precip: float
):
"""Prints weather data to the console"""
print(f"Most recent temp C: {most_recent_temp} degrees")
print(f"Most recent temp F: {most_recent_temp * 9 / 5 + 32} degrees")
print(f"Most recent relative humidity: {most_recent_rel_hum}%")
print(f"Most recent precipitation: {most_recent_precip} mm")
@task(log_prints=True)
def save_weather(
most_recent_temp: float, most_recent_rel_hum: float, most_recent_precip: float
):
"""Saves weather data to a CSV file"""
try:
with open("weather.csv", "w+") as w:
w.write(f"Most recent temp C: {most_recent_temp} degrees\n")
w.write(f"Most recent temp F: {most_recent_temp * 9 / 5 + 32} degrees\n")
w.write(f"Most recent relative humidity: {most_recent_rel_hum}%\n")
w.write(f"Most recent precipitation: {most_recent_precip} mm\n")
return "Successfully wrote temp"
except Exception as e:
return f"An error occurred: {e}"
@flow(log_prints=True)
def prefect_pipeline():
"""A Prefect flow that fetches, prints, and saves weather data"""
most_recent_temp, most_recent_rel_hum, most_recent_precip = fetch_weather()
print_weather(most_recent_temp, most_recent_rel_hum, most_recent_precip)
save_weather(most_recent_temp, most_recent_rel_hum, most_recent_precip)
if __name__ == "__main__":
prefect_pipeline()
Lennert Van de Velde
09/13/2023, 2:02 PMimport httpx
from prefect import flow, get_run_logger, task
@task
def get_temperature_2m(base_url: str, lat: float, lon: float):
logger = get_run_logger()
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
<http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task
def get_temperature_180m(base_url: str, lat: float, lon: float):
logger = get_run_logger()
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_180m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_180m"][0])
<http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task
def get_difference_in_temp(temp_2m: float, temp_180m: float):
logger = get_run_logger()
difference = temp_2m - temp_180m
<http://logger.info|logger.info>(f"Difference in temp: {difference} degrees")
return difference
@flow()
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
two_m_temp = get_temperature_2m(base_url, lat, lon)
one_eighty_m_temp = get_temperature_180m(base_url, lat, lon)
get_difference_in_temp(two_m_temp, one_eighty_m_temp)
if __name__ == "__main__":
fetch_weather.serve(name="depl-101-weather-app")
David Maxson
09/13/2023, 2:09 PMimport csv
import httpx
from prefect import flow, task
METEO_URL = "<https://api.open-meteo.com/v1/forecast/>"
@task
def fetch_metric(lat: float, lon: float, metric_rate: str, metric_name: str):
weather = httpx.get(
METEO_URL,
params=dict(latitude=lat, longitude=lon, **{metric_rate: metric_name}),
)
most_recent_temp = float(weather.json()[metric_rate][metric_name][0])
return most_recent_temp
@task
def save_metrics(filename: str, lat: float, lon: float, metric_results: list[tuple[str, str, float]]):
with open(filename, 'w') as f:
writer = csv.DictWriter(f, fieldnames=['lat', 'lon', 'metric_rate', 'metric_name', 'value'])
writer.writeheader()
for metric_rate, metric_name, value in metric_results:
writer.writerow(dict(lat=lat, lon=lon, metric_rate=metric_rate, metric_name=metric_name, value=value))
@flow
def open_meteo_dataflow(lat: float, lon: float, metrics: list[tuple[str, str]] = (
('hourly', 'apparent_temperature'),
), filename: str = 'lab1.csv'):
metric_values = [fetch_metric(lat, lon, metric_rate, metric_name) for metric_rate, metric_name in metrics]
save_metrics(filename, lat, lon, [(r, n, v) for ((r, n), v) in zip(metrics, metric_values)])
if __name__ == '__main__':
open_meteo_dataflow.serve("lab-1")
Jonas
09/13/2023, 2:09 PMimport httpx
from prefect import Flow, Task
@Flow
def fetch_weather(lat: float, lon: float):
some_temp = get_temperature(lat, lon)
print(f'Some temperature = {some_temp}')
some_windspeed = get_windspeed(lat, lon)
return sum(some_temp, some_windspeed)
@Task
def get_temperature(lat, lon):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
# print(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@Task
def get_windspeed(lat, lon):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
# print(weather.json())
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
# print(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_windspeed
@Task
def sum(a,b):
return a+b
if __name__ == "__main__":
fetch_weather.serve(name = 'test-flow')
# fetch_weather(30,60)
# (38.9, -77.0)
Sonny Nguyen
09/13/2023, 2:12 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task(log_prints=True)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m", "windspeed_10m", "rain"]),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
most_recent_wind_speed = float(weather.json()["hourly"]["windspeed_10m"][0])
most_recent_rain= float(weather.json()["hourly"]["rain"][0])
return most_recent_temp, most_recent_wind_speed, most_recent_rain
@task(log_prints=True)
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow(log_prints=True)
def pipeline(lat=38.9, lon=-77.0):
temp, wind, rain = fetch_weather(lat, lon)
temperature = save_weather(temp)
wind = save_weather(wind)
rain = save_weather(rain)
return temperature, wind, rain
if __name__ == "__main__":
pipeline.serve(name="weather-flow")