One person from each group: share your code in the...
# pacc-feb-13-14-2024
j
One person from each group: share your code in the ๐Ÿงต:
๐Ÿ”ฅ 1
f
Copy code
import httpx
from prefect import flow


@flow
def fetch_weather(lat: float = 38.9, lon: float = -77.0):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    temps = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0])
    print(f"Forecasted temp C: {forecasted_temp} degrees")
    return forecasted_temp


if __name__ == "__main__":
    fetch_weather.serve(name="meteo-deployment", parameters={"lat": 38.8, "lon": -77.0})
๐Ÿ”ฅ 1
s
Copy code
import httpx
from prefect import flow


@flow()
def fetch_weather(lat: float = 38.9, lon: float = -77.0):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    temps = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0])
    print(f"Forecasted temp C: {forecasted_temp} degrees")
    return forecasted_temp


if __name__ == "__main__":
    # run every hour
    fetch_weather.serve(name='weather1-flow', cron='0 * * * *', parameters={'lat': 38.9, 'lon': -75.0})
๐Ÿ™Œ 1
i
Copy code
from typing import Literal
from prefect import flow, get_run_logger
import httpx

SCHEDULE_CRON = "* * * * *"
Variables = Literal["temperature_2m", "relative_humidity_2m", "wind_speed_10m"]


@flow
def fetch_weather (lat: float = 38.9, lon: float = -77.0, target_variable: Variables = "temperature_2m"):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Input params: lat: {lat}, lon {lon}")
    request_params = {
        "latitude": lat,
        "longitude": lon,
        "hourly": target_variable
    }

    temps = httpx.get(
        base_url,
        params=request_params
    )

    forecasted_temp = float(temps.json()["hourly"][target_variable][0])
    print(temps.json()) # print json schema only in the worker logs
    if target_variable == "temperature_2m":
        <http://logger.info|logger.info>(f"Forecasted ๐ŸŒก๏ธ temp C: {forecasted_temp} degrees")
    elif target_variable == "relative_humidity_2m":
        <http://logger.info|logger.info>(f"Forecasted ๐Ÿ’ง relative humidity: {forecasted_temp} %")
    elif target_variable == "wind_speed_10m":
        <http://logger.info|logger.info>(f"Forecasted ๐Ÿ’จ wind speed: {forecasted_temp} m/s")
    return forecasted_temp

if __name__ == "__main__":
    fetch_weather.serve(name="deployment_1", cron=SCHEDULE_CRON, parameters=dict(lat=50, lon=77.0))
๐Ÿฆœ 1