Jeff Hale
07/06/2023, 1:43 PMhere :)
Dima Anoshin
07/06/2023, 1:58 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_wind(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
return most_recent_windspeed
@task
def fetch_rain(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
most_recent_rain = float(weather.json()["hourly"]["rain"][0])
return most_recent_rain
@task
def save_weather(temp, windspeed, rain):
with open("weather.csv", "w+") as w:
w.write(str(temp) + ", " + str(windspeed) + ", " + str(rain) + "\n")
return "Successfully wrote temp, windspeed, and rain"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
windspeed = fetch_wind(lat, lon)
rain = fetch_rain(lat, lon)
result = save_weather(temp, windspeed, rain)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Angel Alvizu
07/06/2023, 2:00 PMimport httpx
from prefect import flow, task
def make_request(lat, lon, metric):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=metric),
)
most_recent_metric = float(weather.json()["hourly"][metric][0])
return most_recent_metric
@task
def get_temperature(lat: float, lon: float):
return make_request(lat, lon, "temperature_2m")
@task
def get_apparent_temperature(lat: float, lon: float):
return make_request(lat, lon, "apparent_temperature")
@task
def get_relative_humidity(lat: float, lon: float):
return make_request(lat, lon, "relativehumidity_2m")
@flow
def fetch_weather(lat: float, lon: float):
most_recent_temp = get_temperature(lat, lon)
print(f"Most recent temp C: {most_recent_temp} degrees")
most_recent_apparent_temperature = get_apparent_temperature(lat, lon)
print(f"Most recent temp C: {most_recent_apparent_temperature} degrees")
most_recent_humidity = get_relative_humidity(lat, lon)
print(f"Most recent relative humidity: {most_recent_humidity} %")
if __name__ == "__main__":
fetch_weather(38.9, -77.0)
dainslie
07/06/2023, 2:00 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_wind_speed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
wind_speed = float(weather.json()["hourly"]["windspeed_10m"][0])
return wind_speed
@task
def save_weather(temp: float, wind_speed: float):
with open("weather.csv", "a+") as w:
w.write(str(temp) + ", " + str(wind_speed) + "\n")
return "Successfully wrote temp and wind speed"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
wind_speed = fetch_wind_speed(lat, lon)
result = save_weather(temp, wind_speed)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Kiani
07/06/2023, 2:50 PM