https://prefect.io logo
j

Jeff Hale

08/16/2023, 5:02 PM
Share code for lab 102 here (thread):
🦜 1
s

Sanjeev

08/16/2023, 5:02 PM
import httpx from prefect import flow, task @task def get_url(): return "https://api.open-meteo.com/v1/forecast/" @task() def fetch_weather(lat: float, lon: float, base_url): weather = httpx.get(base_url, params=dict(latitude=lat, longitude=lon, hourly="rain"), ) #print(weather.json()) most_recent_rain = float(weather.json()["hourly"]["rain"][0]) print(f"Most recent rain C: {most_recent_rain} mm") return most_recent_rain @task(persist_result=True) def save_rain(rain): with open("weather.csv", "w+") as w: w.write(str(rain)) return "Successfully wrote Windspeed" @flow(retries=4, retry_delay_seconds=0.1) def pipeline(lat: float, lon: float): base_url = get_url() rain = fetch_weather(lat, lon, base_url) result = save_rain(rain) return result if name == "__main__": pipeline(38.9, -77.0)
👀 1
🦜 1
m

Maha

08/16/2023, 5:02 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect import get_run_logger
import csv



@task(retries=2, cache_key_fn=task_input_hash)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    if weather.status_code == 400:
        raise Exception

    weather = float(weather.json()["hourly"]["temperature_2m"][0])
    logger = get_run_logger()
    s = 'the weather is ' + str(weather)
    <http://logger.info|logger.info>(s)
    return weather

@task(cache_key_fn=task_input_hash)
def get_wind(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    weather = float(weather.json()["hourly"]["windspeed_10m"][0])
    return weather   


@task(retries=2, cache_key_fn=task_input_hash)
def save_weather(weather):
    with open("weather.csv", "w+") as w:
        w.write(str(weather))
    return "Successfully wrote temp"








@flow(log_prints=True)
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    
    wind_speed = get_wind(lat, lon)

    weather_report = {
        1:wind_speed,
        2:temp
    }

    result = save_weather(weather_report)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
🦜 1
r

Rahul Viswanath C

08/16/2023, 5:02 PM
Copy code
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx

@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat , longitude=lon , hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

@task
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat , longitude=lon , hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
    return most_recent_windspeed

@task
def save_weather(temp: float):
    with open("weather.csv","w+") as w:
        w.write(str(temp))
    return "Successfully wrote weather"

#caching
@task(cache_key_fn=task_input_hash , cache_expiration=timedelta(minutes=3))
def save_windspeed(temp: float):
    with open("windspeed.csv","w+") as w:
        w.write(str(temp))
    return "Successfully wrote windspeed"

#retries
@task(retries=2 , retry_delay_seconds=0.3)
def fetch():
    cat_fact = httpx.get("<https://httpstat.us/Random/200,500>")
    if cat_fact.status_code >= 400:
        raise Exception
    print(cat_fact.txt)


@flow
def pipeline(lat: float, lon: float):
    logger = get_run_logger()

    temp = fetch_weather(lat,lon)
    weather_result = save_weather(temp)
    windspeed = fetch_windspeed(lat,lon)
    windspeed_result = save_windspeed(windspeed)
    print(f"{weather_result} , {windspeed_result}")
    
    #retries
    fetch()
    
    #logging
    <http://logger.info|logger.info>("The weather result is " +weather_result)
    <http://logger.info|logger.info>("The windspeed result is "+windspeed_result)


if __name__ == "__main__":
    pipeline(38.9,-77.0)
blob attention gif 1
m

Mike Larsson

08/16/2023, 5:02 PM
weather.py
🙌 1
a

Alex Rodriguez

08/16/2023, 5:03 PM
Copy code
from datetime import timedelta

import httpx
from prefect import flow, task
from prefect.tasks import task_input_hash


@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_rain(lat: float, lon: float) -> float:
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="precipitation"),
    )
    return float(weather.json()["hourly"]["precipitation"][0])


@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_humidity(lat: float, lon: float) -> float:
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
    )
    return float(weather.json()["hourly"]["relativehumidity_2m"][0])


@task(retries=3, retry_delay_seconds=5, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_temp(lat: float, lon: float) -> float:
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    # raise ValueError("This is a test")
    return float(weather.json()["hourly"]["temperature_2m"][0])


@task
def print_results(temp: float, humidity: float, rain: float) -> None:
    print(f"Temperature : {temp}, Relative Humitidy : {humidity}, Rain : {rain}%")


@flow
def test_flow(lat: float, lon: float):
    temp = get_temp(lat, lon)
    humidity = get_humidity(lat, lon)
    rain = get_rain(lat, lon)
    print_results(temp, humidity, rain)


if __name__ == "__main__":
    test_flow(2, 3)
sonic 1
wizard2 1
a

Abhilash Agarwal

08/16/2023, 5:03 PM
import httpx from prefect import flow, task from prefect.tasks import task_input_hash from datetime import timedelta @task(retries=2) def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params = dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0]) print(f"Most recent temp C: {most_recent_temp} degrees") return most_recent_temp @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5)) def save_weather(temp: float): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Succefully wrote temp" @flow(retries=3) def pipeline(lat: float, lon: float): temp = fetch_weather(lat, lon) result = save_weather(temp) return result if name == "__main__": pipeline(38.9, -77.0)
🙌 1
s

sfrasier

08/16/2023, 5:06 PM
Is persist_result universal within a flow? I noticed I couldn't specify persist result and cache result within the same flow (different tasks)
image.png,image.png