https://prefect.io logo
j

Jeff Hale

08/16/2023, 4:37 PM
Lab 102 instructions (also in slides): • Use a flow that grabs weather data from open-meteo • Add retries • Start a Prefect server instance • Run your flow • Inspect in the UI • Stretch: add caching
o

Ovo Ojameruaye

08/16/2023, 4:38 PM
Copy code
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: DELETE FROM flow_run_notification_queue WHERE flow_run_notification_queue.id IN (SELECT 1 FROM (SELECT 1) WHERE 1!=1) RETURNING id]
1
@Jeff Hale Maha and I get this when we start the server
1
n

Nathaniel Russell

08/16/2023, 4:57 PM
Copy code
from prefect import flow, task, tags
from prefect.logging import get_run_logger
import httpx
from prefect.tasks import task_input_hash
from typing import List


def fetch_weather(lat: float, lon: float):
    logger = get_run_logger()
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    <http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp


@task(name="fetch_many", cache_key_fn=task_input_hash)
def fetch_many_weather(lat:float):
    lon = -20
    temp_list = {}
    while lon < 20:
        temp = fetch_weather(lat, lon)
        temp_list[lon] = temp
        lon += 5
    return temp_list


@task(name="save_weather_info")
def save_weather(temp: List[float]):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


@task(name="display")
def display(temp_list):
    logger = get_run_logger()
    <http://logger.info|logger.info>(temp_list)


@flow(name="weather_pipeline", retries=3)
def pipeline(lat: float):
    temp = fetch_many_weather(lat)
    result = save_weather(temp)
    display(temp)
    return result


if __name__ == "__main__":
    pipeline(38.9)
2 Views