https://prefect.io logo
j

Jeff Hale

08/22/2023, 2:03 PM
Thread to share code from module 101:
e

Emma Rizzi

08/22/2023, 2:05 PM
Added precipitation probability and a print task :
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m,precipitation_probability"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    precipitations = float(weather.json()["hourly"]["precipitation_probability"][0])
    return most_recent_temp, precipitations


@task
def save_weather(temp: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"

@task
def print_weather(temp: float, precipitations: float):
    print(f"Weather for today : {temp} °C and precipitations probablity : {precipitations}")

@flow
def pipeline(lat: float, lon: float):
    temp, precipitations = fetch_weather(lat, lon)
    result = save_weather(temp)
    print_weather(temp, precipitations)
    return result


if __name__ == "__main__":
    pipeline(43.604652, 1.444209)
🙌 1
a

Alex Litvinov

08/22/2023, 2:05 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    return float(weather.json()["hourly"]["temperature_2m"][0])


@task
def save_weather(temp: float, windspeed: float):
    with open("weather.csv", "w+") as w:
        w.write(f'{temp}, {windspeed}')
    return "Successfully wrote temp"

@task
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
    base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    return float(weather.json()["hourly"]["windspeed_10m"][0])
    


@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    windspeed = fetch_windspeed(lat, lon)
    return save_weather(temp, windspeed)


if __name__ == "__main__":
    pipeline(38.9, -77.0)
blob attention gif 1
s

Sylvain Hazard

08/22/2023, 2:06 PM
Added a parameter to get different weather variables :
Copy code
import httpx
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float, variable: str = "temperature_2m"):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=variable),
    )
    most_recent = float(weather.json()["hourly"][variable][0])
    print(most_recent)
    return most_recent


@flow
def get_variables(lat: float, lon: float):
    temp = fetch_weather(lat, lon, "temperature_2m")
    wind_speed = fetch_weather(lat, lon, "windspeed_10m")
    rain = fetch_weather(lat, lon, "rain")

    return temp, wind_speed, rain


if __name__ == "__main__":
    get_variables(44.837789, -0.57918)
wizard2 1
u

UD

08/22/2023, 2:24 PM
import httpx  # requests capability, but can work with async
from prefect import flow, task @task def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0]) return most_recent_temp @task def fetch_rain(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="rain"), ) rain = float(weather.json()["hourly"]["rain"][0]) return rain @task def save_weather(temp: float): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @task def save_rain(rain: float): with open("rain.csv", "w+") as w: w.write(str(rain)) return "Successfully wrote rain" @flow def pipeline(lat: float, lon: float): temp = fetch_weather(lat, lon) rain = fetch_rain(lat, lon) result = save_rain(rain) result1 = save_weather(temp) return result if name == "__main__": pipeline(38.9, -77.0)
squirtle cool 1