https://prefect.io logo
j

Jon

08/22/2023, 1:47 PM
room 8 thread
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_temp(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task
def fetch_wind(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_wind = float(weather.json()["hourly"]["windspeed_10m"][0])
    return most_recent_wind


@task
def save_weather(temp: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


@flow
def pipeline(lat: float, lon: float):
    temp = fetch_temp(lat, lon)
    
    result = save_weather(temp)

    x = fetch_wind(lat, lon)
    result = save_weather(x)

    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
j

Jon

08/22/2023, 2:02 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task, logging


@task
def fetch_temp(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(
            latitude=lat,
            longitude=lon,
            hourly="temperature_2m",
        ),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task
def fetch_wind(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(
            latitude=lat,
            longitude=lon,
            hourly="windspeed_10m",
        ),
    )
    most_recent_wind = float(weather.json()["hourly"]["windspeed_10m"][0])
    return most_recent_wind


@task
def save_weather(temp: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


@flow
def pipeline(lat: float, lon: float):
    logger = logging.get_run_logger()
    temp = fetch_temp(lat, lon)

    result = save_weather(temp)

    x = fetch_wind(lat, lon)

    <http://logger.info|logger.info>(f"temp >>>>>>>>> {temp }")
    <http://logger.info|logger.info>(f"wind >>>>>>>>>> {x}")
    result = save_weather(x)

    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
code ^
s

Sanz Al

08/22/2023, 2:06 PM
Thanks @Isabel for pushing the room forward
🙏 2
2