Jeff Hale
07/11/2023, 1:51 PMJakub Dvorak
07/11/2023, 1:54 PMimport httpx  # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp
@task
def fetch_rain(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    rain_forecast = float(weather.json()["hourly"]["rain"][0])
    print(f"Recent rain forecast: {rain_forecast}")
    return rain_forecast
@task
def fetch_showers(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="showers"),
    )
    showers_forecast = float(weather.json()["hourly"]["showers"][0])
    print(f"Recent showers: {showers_forecast}")
    return showers_forecast
@task
def save_weather(temp: float, rain: float, showers: float):
    with open("weather.csv", "w+") as w:
        w.write("Temperature: " + str(temp) + "\n")
        w.write("Rain: " + str(rain) + "\n")
        w.write("Showers: " + str(showers) + "\n")
    return "Successfully wrote data"
@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    rain = fetch_rain(lat, lon)
    showers = fetch_showers(lat, lon)
    
    
    result = save_weather(temp, rain, showers)
    return result
if __name__ == "__main__":
    pipeline(38.9, -77.0)John Seippel
07/11/2023, 1:54 PMimport httpx
from prefect import flow, task
def _fetch_weather(lat: float, lon: float, variable: str):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=variable),
    )
    most_recent_var = float(weather.json()["hourly"][variable][0])
    print(f"Most recent {variable}: {most_recent_var}")
@task
def fetch_most_recent_surface_pressure(lat: float, lon: float):
    return _fetch_weather(lat, lon, "surface_pressure")
@task
def fetch_most_recent_windspeed(lat: float, lon: float):
    return _fetch_weather(lat, lon, "windspeed_10m")
@task
def fetch_most_recent_temp(lat: float, lon: float):
    return _fetch_weather(lat, lon, "temperature_2m")
@flow
def pipeline():
    coords = (38.9, -77.0)
    fetch_most_recent_temp(*coords)
    fetch_most_recent_surface_pressure(*coords)
    fetch_most_recent_windspeed(*coords)
if __name__ == "__main__":
    pipeline()Philipp Gräbel
07/11/2023, 1:59 PMimport httpx  # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="precipitation"),
    )
    precipitation_hourly = [float(x) for x in weather.json()["hourly"]["precipitation"]]
    return precipitation_hourly
@task
def sum_up_precipitation(precipitation_hourly: list[float]):
    return sum(precipitation_hourly[:24]), sum(precipitation_hourly)
@task
def save_weather(precipitation: float, description: str):
    with open("weather.csv", "a") as w:
        w.write(f'{description};{precipitation}\n')
    return f"Successfully wrote {description}"
@flow
def pipeline(lat: float, lon: float):
    precipitation_hourly = fetch_weather(lat, lon)
    precipitation_day, precipitation_weekly = sum_up_precipitation(precipitation_hourly)
    result1 = save_weather(precipitation_day, 'precipitation last day')
    result2 = save_weather(precipitation_weekly, 'precipitation last week')
    return result1, result2
if __name__ == "__main__":
    pipeline(38.9, -77.0)János
07/11/2023, 2:01 PMVinayak Nair
07/11/2023, 2:02 PMimport httpx  # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    windspeed = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(windspeed.json()["hourly"]["windspeed_10m"][0])
    return most_recent_windspeed
@task
def fetch_relative_humidity(lat:float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    relative_humidity = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
    )
    most_recent_relative_humidity = float(relative_humidity.json()["hourly"]["relativehumidity_2m"][0])
    return most_recent_relative_humidity
@task
def fetch_rain(lat:float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    rain = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    most_recent_rain = float(rain.json()["hourly"]["rain"][0])
    return most_recent_rain
@task
def save_data(windspeed: float, relative_humidity: float, rain:float):
    with open("weather_deets.csv", "w+") as w:
        w.write(f"{windspeed},{relative_humidity}, {rain}")
    return "Successfully wrote data"
@flow
def pipeline(lat: float, lon: float):
    windspeed = fetch_windspeed(lat, lon)
    relative_humidity = fetch_relative_humidity(lat, lon)
    rain = fetch_rain(lat, lon)
    result = save_data(windspeed, relative_humidity, rain)
    return result
if __name__ == "__main__":
    pipeline(52.5244, 13.4105)Lyes Khalfaoui
07/11/2023, 2:11 PMimport httpx
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon,
                    hourly="temperature_2m,visibility,windspeed_10m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp
@task
def fetch_weather_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["windspeed_10m"][0])
    print(f"Most recent windspeed {most_recent_temp} degrees")
    return most_recent_temp
@task
def fetch_weather_visibility(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="visibility"),
    )
    most_recent_temp = float(weather.json()["hourly"]["visibility"][0])
    print(f"Most recent visibility: {most_recent_temp} ")
    return most_recent_temp
@task
def save_weather(temp: float, windspeed: float, visibility: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote weather metrics for paris"
@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    windspeed = fetch_weather_windspeed(lat, lon)
    visibility = fetch_weather_visibility(lat, lon)
    result = save_weather(temp, windspeed, visibility)
    return result
if __name__ == "__main__":
    # fetch_weather(38.9, -77.0)
    pipeline(48.856614, 2.3522219)