Jeff Hale
07/11/2023, 1:51 PMJakub Dvorak
07/11/2023, 1:54 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
print(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task
def fetch_rain(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
rain_forecast = float(weather.json()["hourly"]["rain"][0])
print(f"Recent rain forecast: {rain_forecast}")
return rain_forecast
@task
def fetch_showers(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="showers"),
)
showers_forecast = float(weather.json()["hourly"]["showers"][0])
print(f"Recent showers: {showers_forecast}")
return showers_forecast
@task
def save_weather(temp: float, rain: float, showers: float):
with open("weather.csv", "w+") as w:
w.write("Temperature: " + str(temp) + "\n")
w.write("Rain: " + str(rain) + "\n")
w.write("Showers: " + str(showers) + "\n")
return "Successfully wrote data"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
rain = fetch_rain(lat, lon)
showers = fetch_showers(lat, lon)
result = save_weather(temp, rain, showers)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
John Seippel
07/11/2023, 1:54 PMimport httpx
from prefect import flow, task
def _fetch_weather(lat: float, lon: float, variable: str):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=variable),
)
most_recent_var = float(weather.json()["hourly"][variable][0])
print(f"Most recent {variable}: {most_recent_var}")
@task
def fetch_most_recent_surface_pressure(lat: float, lon: float):
return _fetch_weather(lat, lon, "surface_pressure")
@task
def fetch_most_recent_windspeed(lat: float, lon: float):
return _fetch_weather(lat, lon, "windspeed_10m")
@task
def fetch_most_recent_temp(lat: float, lon: float):
return _fetch_weather(lat, lon, "temperature_2m")
@flow
def pipeline():
coords = (38.9, -77.0)
fetch_most_recent_temp(*coords)
fetch_most_recent_surface_pressure(*coords)
fetch_most_recent_windspeed(*coords)
if __name__ == "__main__":
pipeline()
Philipp Gräbel
07/11/2023, 1:59 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="precipitation"),
)
precipitation_hourly = [float(x) for x in weather.json()["hourly"]["precipitation"]]
return precipitation_hourly
@task
def sum_up_precipitation(precipitation_hourly: list[float]):
return sum(precipitation_hourly[:24]), sum(precipitation_hourly)
@task
def save_weather(precipitation: float, description: str):
with open("weather.csv", "a") as w:
w.write(f'{description};{precipitation}\n')
return f"Successfully wrote {description}"
@flow
def pipeline(lat: float, lon: float):
precipitation_hourly = fetch_weather(lat, lon)
precipitation_day, precipitation_weekly = sum_up_precipitation(precipitation_hourly)
result1 = save_weather(precipitation_day, 'precipitation last day')
result2 = save_weather(precipitation_weekly, 'precipitation last week')
return result1, result2
if __name__ == "__main__":
pipeline(38.9, -77.0)
János
07/11/2023, 2:01 PMVinayak Nair
07/11/2023, 2:02 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
windspeed = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_windspeed = float(windspeed.json()["hourly"]["windspeed_10m"][0])
return most_recent_windspeed
@task
def fetch_relative_humidity(lat:float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
relative_humidity = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
)
most_recent_relative_humidity = float(relative_humidity.json()["hourly"]["relativehumidity_2m"][0])
return most_recent_relative_humidity
@task
def fetch_rain(lat:float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
rain = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
most_recent_rain = float(rain.json()["hourly"]["rain"][0])
return most_recent_rain
@task
def save_data(windspeed: float, relative_humidity: float, rain:float):
with open("weather_deets.csv", "w+") as w:
w.write(f"{windspeed},{relative_humidity}, {rain}")
return "Successfully wrote data"
@flow
def pipeline(lat: float, lon: float):
windspeed = fetch_windspeed(lat, lon)
relative_humidity = fetch_relative_humidity(lat, lon)
rain = fetch_rain(lat, lon)
result = save_data(windspeed, relative_humidity, rain)
return result
if __name__ == "__main__":
pipeline(52.5244, 13.4105)
Lyes Khalfaoui
07/11/2023, 2:11 PMimport httpx
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon,
hourly="temperature_2m,visibility,windspeed_10m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
print(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task
def fetch_weather_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_temp = float(weather.json()["hourly"]["windspeed_10m"][0])
print(f"Most recent windspeed {most_recent_temp} degrees")
return most_recent_temp
@task
def fetch_weather_visibility(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="visibility"),
)
most_recent_temp = float(weather.json()["hourly"]["visibility"][0])
print(f"Most recent visibility: {most_recent_temp} ")
return most_recent_temp
@task
def save_weather(temp: float, windspeed: float, visibility: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote weather metrics for paris"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
windspeed = fetch_weather_windspeed(lat, lon)
visibility = fetch_weather_visibility(lat, lon)
result = save_weather(temp, windspeed, visibility)
return result
if __name__ == "__main__":
# fetch_weather(38.9, -77.0)
pipeline(48.856614, 2.3522219)