https://prefect.io logo
j

Jeff Hale

07/11/2023, 1:51 PM
Share your code in a 🧵 here:
j

Jakub Dvorak

07/11/2023, 1:54 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task

@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@task
def fetch_rain(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    rain_forecast = float(weather.json()["hourly"]["rain"][0])
    print(f"Recent rain forecast: {rain_forecast}")
    return rain_forecast

@task
def fetch_showers(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="showers"),
    )
    showers_forecast = float(weather.json()["hourly"]["showers"][0])
    print(f"Recent showers: {showers_forecast}")
    return showers_forecast


@task
def save_weather(temp: float, rain: float, showers: float):
    with open("weather.csv", "w+") as w:
        w.write("Temperature: " + str(temp) + "\n")
        w.write("Rain: " + str(rain) + "\n")
        w.write("Showers: " + str(showers) + "\n")
    return "Successfully wrote data"

@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    rain = fetch_rain(lat, lon)
    showers = fetch_showers(lat, lon)
    
    
    result = save_weather(temp, rain, showers)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
🙌 1
j

John Seippel

07/11/2023, 1:54 PM
Copy code
import httpx

from prefect import flow, task


def _fetch_weather(lat: float, lon: float, variable: str):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=variable),
    )
    most_recent_var = float(weather.json()["hourly"][variable][0])
    print(f"Most recent {variable}: {most_recent_var}")


@task
def fetch_most_recent_surface_pressure(lat: float, lon: float):
    return _fetch_weather(lat, lon, "surface_pressure")


@task
def fetch_most_recent_windspeed(lat: float, lon: float):
    return _fetch_weather(lat, lon, "windspeed_10m")


@task
def fetch_most_recent_temp(lat: float, lon: float):
    return _fetch_weather(lat, lon, "temperature_2m")


@flow
def pipeline():
    coords = (38.9, -77.0)
    fetch_most_recent_temp(*coords)
    fetch_most_recent_surface_pressure(*coords)
    fetch_most_recent_windspeed(*coords)


if __name__ == "__main__":
    pipeline()
🙌 1
🦜 1
p

Philipp Gräbel

07/11/2023, 1:59 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="precipitation"),
    )
    precipitation_hourly = [float(x) for x in weather.json()["hourly"]["precipitation"]]
    return precipitation_hourly

@task
def sum_up_precipitation(precipitation_hourly: list[float]):
    return sum(precipitation_hourly[:24]), sum(precipitation_hourly)


@task
def save_weather(precipitation: float, description: str):
    with open("weather.csv", "a") as w:
        w.write(f'{description};{precipitation}\n')
    return f"Successfully wrote {description}"


@flow
def pipeline(lat: float, lon: float):
    precipitation_hourly = fetch_weather(lat, lon)
    precipitation_day, precipitation_weekly = sum_up_precipitation(precipitation_hourly)
    result1 = save_weather(precipitation_day, 'precipitation last day')
    result2 = save_weather(precipitation_weekly, 'precipitation last week')
    return result1, result2


if __name__ == "__main__":
    pipeline(38.9, -77.0)
👀 1
blob attention gif 2
🦜 2
j

János

07/11/2023, 2:01 PM
import httpx # requests capability, but can work with async from prefect import flow, task @task def fetch_wind_speed(lat: float, lon: float): base_url = “https://api.open-meteo.com/v1/forecast/” weather = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly=“windspeed_10m”), ) wind_speed = weather.json()[“hourly”][“windspeed_10m”][0] return wind_speed @task def fetch_weather(lat: float, lon: float): base_url = “https://api.open-meteo.com/v1/forecast/” weather = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly=“temperature_2m”), ) most_recent_temp = float(weather.json()[“hourly”][“temperature_2m”][0]) return most_recent_temp @task def save(speed): with open(“weather.csv”, “w+“) as w: w.write(str(speed)) return “Successfully wrote temp” @flow def pipeline(lat: float, lon: float): speed = fetch_wind_speed(lat, lon) print(f”{speed=}“) result = save(speed) temp = fetch_weather(lat, lon) print(f”{temp=}“) result = save(temp) return result if name == “__main__“: pipeline(30.46, -97.76)
👍 1
🦜 1
sonic 2
v

Vinayak Nair

07/11/2023, 2:02 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task

@task
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    windspeed = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(windspeed.json()["hourly"]["windspeed_10m"][0])
    return most_recent_windspeed

@task
def fetch_relative_humidity(lat:float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    relative_humidity = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
    )
    most_recent_relative_humidity = float(relative_humidity.json()["hourly"]["relativehumidity_2m"][0])
    return most_recent_relative_humidity

@task
def fetch_rain(lat:float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    rain = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    most_recent_rain = float(rain.json()["hourly"]["rain"][0])
    return most_recent_rain

@task
def save_data(windspeed: float, relative_humidity: float, rain:float):
    with open("weather_deets.csv", "w+") as w:
        w.write(f"{windspeed},{relative_humidity}, {rain}")
    return "Successfully wrote data"


@flow
def pipeline(lat: float, lon: float):
    windspeed = fetch_windspeed(lat, lon)
    relative_humidity = fetch_relative_humidity(lat, lon)
    rain = fetch_rain(lat, lon)
    result = save_data(windspeed, relative_humidity, rain)
    return result


if __name__ == "__main__":
    pipeline(52.5244, 13.4105)
👍 1
🦜 2
blob attention gif 2
l

Lyes Khalfaoui

07/11/2023, 2:11 PM
Copy code
import httpx
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon,
                    hourly="temperature_2m,visibility,windspeed_10m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp


@task
def fetch_weather_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["windspeed_10m"][0])
    print(f"Most recent windspeed {most_recent_temp} degrees")
    return most_recent_temp


@task
def fetch_weather_visibility(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="visibility"),
    )
    most_recent_temp = float(weather.json()["hourly"]["visibility"][0])
    print(f"Most recent visibility: {most_recent_temp} ")
    return most_recent_temp


@task
def save_weather(temp: float, windspeed: float, visibility: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote weather metrics for paris"


@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    windspeed = fetch_weather_windspeed(lat, lon)
    visibility = fetch_weather_visibility(lat, lon)
    result = save_weather(temp, windspeed, visibility)
    return result


if __name__ == "__main__":
    # fetch_weather(38.9, -77.0)
    pipeline(48.856614, 2.3522219)
🙌 2
blob attention gif 1