Adir Abargil
08/22/2023, 2:54 PMfrom datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
import httpx
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes= 1))
def fetch_temp(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@flow
def pipeline(lat, lon):
logger = get_run_logger()
result = fetch_temp(lat, lon)
<http://logger.info|logger.info>(f"result : {str(result)}")
if __name__ == '__main__':
pipeline(38.9, -77.0)
Rakesh K
08/22/2023, 2:55 PM