Emiliano
05/31/2023, 1:59 PMEmiliano
05/31/2023, 1:59 PMimport httpx
from prefect import flow, task
def fetch_api(lat: float, lon: float, hourly: str):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=hourly),
)
most_recent_data = float(weather.json()["hourly"][hourly][0])
print(f"Most recent {hourly}: {most_recent_data}")
return most_recent_data
@task
def fetch_weather(lat: float, lon: float):
results = fetch_api(lat, lon, "temperature_2m")
return results
@task
def fetch_windspeed(lat: float, lon: float):
results = fetch_api(lat, lon, "windspeed_10m")
return results
@task
def fetch_rain(lat: float, lon: float):
results = fetch_api(lat, lon, "rain")
return results
@flow()
def pipeline(lat: float, lon: float):
latest_temp = fetch_weather(lat, lon)
latest_windspeed = fetch_windspeed(lat, lon)
latest_rain = fetch_rain(lat, lon)
if __name__ == "__main__":
pipeline(lat=10, lon=3.5)