https://prefect.io logo
j

Jeff Hale

08/16/2023, 3:44 PM
Please share your code from lab 101 in a thread here 🧵. Folks always report finding value in seeing how other people tackle the questions. 🙂
🦜 1
Copy code
I'm in a triple backtick code block ``
m

morgan

08/16/2023, 4:02 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
import csv


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    weather = float(weather.json()["hourly"]["temperature_2m"][0])
    return weather

@task
def get_wind(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    weather = float(weather.json()["hourly"]["windspeed_10m"][0])
    return weather   


@task
def save_weather(weather):
    with open("weather.csv", "w+") as w:
        w.write(str(weather))
    return "Successfully wrote temp"








@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    
    wind_speed = get_wind(lat, lon)

    weather_report = {
        1:wind_speed,
        2:temp
    }

    result = save_weather(weather_report)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
🙌 1
m

Mike Larsson

08/16/2023, 4:03 PM
Our group’s code, just prior to trying to make it async:
🦜 1
a

Alex Rodriguez

08/16/2023, 4:04 PM
import httpx
Copy code
from prefect import flow, task                @task
def get_rain(lat: float, lon: float) -> float:
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="precipitation"),
    )
    return float(weather.json()["hourly"]["precipitation"][0])

@task
def get_humidity(lat: float, lon: float) -> float:
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
    )
    return float(weather.json()["hourly"]["relativehumidity_2m"][0])

@task
def get_temp(lat: float, lon: float) -> float:
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    return float(weather.json()["hourly"]["temperature_2m"][0])

@task
def print_results(temp: float, humidity: float, rain: float):
    print(f"Temperature : {temp}, Relative Humitidy : {humidity}, Rain : {rain}%")

@flow
def test_flow(lat: float, lon: float):
    temp = get_temp(lat, lon)
    humidity = get_humidity(lat, lon)
    rain = get_rain(lat, lon)
    print_results(temp, humidity, rain)


if __name__ == "__main__":
    test_flow(1, 1)
blob attention gif 1
s

sfrasier

08/16/2023, 4:04 PM
Copy code
import httpx
from prefect import task, flow


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@task
def fetch_dewpoint(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="dewpoint_2m"),
    )
    most_recent_dewpoint = float(weather.json()["hourly"]["dewpoint_2m"][0])
    print(f"Most recent dewpoint C: {most_recent_dewpoint} degrees")
    return most_recent_dewpoint
@task
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
    print(f"Most recent windspeed C: {most_recent_windspeed} degrees")
    return most_recent_windspeed

@flow
def weather_pipeline(lat: float, lon: float):
    fetch_weather(lat, lon)

    fetch_dewpoint(lat, lon)
    
    fetch_windspeed(lat, lon)


if __name__ == "__main__":
    weather_pipeline(38.9, -77.0)
sonic 1
n

Nathaniel Russell

08/16/2023, 4:05 PM
Copy code
from prefect import flow, task, tags
from prefect.logging import get_run_logger
import httpx
from typing import List


def fetch_weather(lat: float, lon: float):
    logger = get_run_logger()
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    <http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp


@task(name="fetch_many")
def fetch_many_weather(lat:float):
    lon = -175
    temp_list = {}
    while lon < 180:
        temp = fetch_weather(lat, lon)
        temp_list[lon] = temp
        lon += 5
    return temp_list


@task(name="save_weather_info")
def save_weather(temp: List[float]):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


@task(name="display")
def display(temp_list):
    logger = get_run_logger()
    <http://logger.info|logger.info>(temp_list)


@flow(name="weather_pipeline")
def pipeline(lat: float):
    temp = fetch_many_weather(lat)
    result = save_weather(temp)
    display(temp)
    return result


if __name__ == "__main__":
    pipeline(38.9)
squirtle cool 1
r

Rahul Viswanath C

08/16/2023, 4:05 PM
Copy code
from prefect import flow, task
import httpx

@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat , longitude=lon , hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

@task
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat , longitude=lon , hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
    return most_recent_windspeed

@task
def save_weather(temp: float):
    with open("weather.csv","w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"

@task
def save_windspeed(temp: float):
    with open("windspeed.csv","w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat,lon)
    weather_result = save_weather(temp)
    windspeed = fetch_windspeed(lat,lon)
    windspeed_result = save_windspeed(windspeed)
    print(f"{weather_result} , {windspeed_result}")



if __name__ == "__main__":
    pipeline(38.9,-77.0)
marvin 2
s

Sanjeev

08/16/2023, 4:35 PM
import httpx from prefect import flow, task @task def get_url(): return "https://api.open-meteo.com/v1/forecast/" @task() def fetch_weather(lat: float, lon: float, base_url): weather = httpx.get(base_url, params=dict(latitude=lat, longitude=lon, hourly="rain"), ) #print(weather.json()) most_recent_rain = float(weather.json()["hourly"]["rain"][0]) print(f"Most recent rain C: {most_recent_rain} mm") return most_recent_rain @task def save_rain(rain): with open("weather.csv", "w+") as w: w.write(str(rain)) return "Successfully wrote Windspeed" @flow def pipeline(lat: float, lon: float): base_url = get_url() rain = fetch_weather(lat, lon, base_url) result = save_rain(rain) return result if name == "_main_": pipeline(38.9, -77.0)
🙌 1