Jeff Hale
07/06/2023, 2:40 PMAlyssa Harris
07/06/2023, 2:42 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash
@task
def fetch_air_quality(lat: float, lon: float, item: str):
base_url = "<https://air-quality-api.open-meteo.com/v1/air-quality/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=item),
)
print(weather)
most_recent_air_q = float(weather.json()["hourly"][item][0])
return most_recent_air_q
@task(retries=2, retry_delay_seconds=0.1, cache_key_fn=task_input_hash)
def fetch_weather_item(lat: float, lon: float, item: str):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=item),
)
most_recent_item = float(weather.json()["hourly"][item][0])
return most_recent_item
@task
def save_weather(item: float):
with open("weather.csv", "a") as f:
f.write(str(item))
return "Successfully wrote weather item"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather_item(lat, lon, "temperature_2m")
# not writing out precip yet
precip = fetch_weather_item(lat, lon, "precipitation")
air_q = fetch_air_quality(lat, lon, "pm10")
result = save_weather(temp)
result += save_weather(precip)
result += save_weather(air_q)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)