https://prefect.io logo
n

Nihal Desai

08/22/2023, 2:53 PM
room 7
Copy code
from datetime import timedelta
import httpx  # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash


@task
def fetch_temp(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    temp_data = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(temp_data.json()["hourly"]["temperature_2m"][0])
    return str(most_recent_temp)

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    windspeed_data = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_wind_speed = float(windspeed_data.json()["hourly"]["windspeed_10m"][0])
    return str(most_recent_wind_speed)


@task(retries= 4, retry_delay_seconds= 0.5)
def save_weather(temp: float):
    with open("weather.txt", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"

@flow(retries= 4)
def pipeline(lat: float, lon: float):
    temp = 'Temperature ' + fetch_temp(lat, lon)
    temp += '\nWindspeed ' + fetch_windspeed(lat, lon)
    result = save_weather(temp)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)