Jeff Hale
08/22/2023, 2:03 PMEmma Rizzi
08/22/2023, 2:05 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m,precipitation_probability"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
precipitations = float(weather.json()["hourly"]["precipitation_probability"][0])
return most_recent_temp, precipitations
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def print_weather(temp: float, precipitations: float):
print(f"Weather for today : {temp} °C and precipitations probablity : {precipitations}")
@flow
def pipeline(lat: float, lon: float):
temp, precipitations = fetch_weather(lat, lon)
result = save_weather(temp)
print_weather(temp, precipitations)
return result
if __name__ == "__main__":
pipeline(43.604652, 1.444209)
Alex Litvinov
08/22/2023, 2:05 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
return float(weather.json()["hourly"]["temperature_2m"][0])
@task
def save_weather(temp: float, windspeed: float):
with open("weather.csv", "w+") as w:
w.write(f'{temp}, {windspeed}')
return "Successfully wrote temp"
@task
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
return float(weather.json()["hourly"]["windspeed_10m"][0])
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
windspeed = fetch_windspeed(lat, lon)
return save_weather(temp, windspeed)
if __name__ == "__main__":
pipeline(38.9, -77.0)
Sylvain Hazard
08/22/2023, 2:06 PMimport httpx
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float, variable: str = "temperature_2m"):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=variable),
)
most_recent = float(weather.json()["hourly"][variable][0])
print(most_recent)
return most_recent
@flow
def get_variables(lat: float, lon: float):
temp = fetch_weather(lat, lon, "temperature_2m")
wind_speed = fetch_weather(lat, lon, "windspeed_10m")
rain = fetch_weather(lat, lon, "rain")
return temp, wind_speed, rain
if __name__ == "__main__":
get_variables(44.837789, -0.57918)
UD
08/22/2023, 2:24 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "https://api.open-meteo.com/v1/forecast/"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_rain(lat: float, lon: float):
base_url = "https://api.open-meteo.com/v1/forecast/"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
rain = float(weather.json()["hourly"]["rain"][0])
return rain
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def save_rain(rain: float):
with open("rain.csv", "w+") as w:
w.write(str(rain))
return "Successfully wrote rain"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
rain = fetch_rain(lat, lon)
result = save_rain(rain)
result1 = save_weather(temp)
return result
if name == "__main__":
pipeline(38.9, -77.0)