https://prefect.io logo
j

Jon

08/22/2023, 2:50 PM
room 5
import httpx # requests capability, but can work with async from prefect import flow, task, logging from datetime import timedelta @task( retries=3, # cache_key_fn= persist_result=False, cache_expiration=timedelta( minutes=2, ), ) def fetch_temp(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params=dict( latitude=lat, longitude=lon, hourly="temperature_2m", ), ) most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0]) return most_recent_temp @task( retries=3, cache_expiration=timedelta( minutes=10, ), ) def fetch_wind(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params=dict( latitude=lat, longitude=lon, hourly="windspeed_10m", ), ) most_recent_wind = float(weather.json()["hourly"]["windspeed_10m"][0]) return most_recent_wind @task def save_weather(temp: float): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow def pipeline(lat: float, lon: float): logger = logging.get_run_logger() temp = fetch_temp(lat, lon) result = save_weather(temp) x = fetch_wind(lat, lon) logger.info(f"temp >>>>>>>>> {temp }") logger.info(f"wind >>>>>>>>>> {x}") result = save_weather(x) return result if name == "__main__": pipeline(38.9, -77.0)