my group's code for those who wanted it
# pacc-june-14-2023
c
my group's code for those who wanted it
🔥 1
🙌 2
Copy code
# x Logging
# - States
# x Retries for tasks and flows
# x API server
# x UI
# - Results
# x Caching
# x Subflows


import httpx  # requests capability, but can work with async
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash


@task(cache_key_fn=task_input_hash)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task(
    log_prints=True,
    retries=4,
    retry_delay_seconds=1,
)
def save_weather(temp: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    print("Successfully wrote temp")
    return "Successfully wrote temp"



@flow
def pipeline(lat: float, lon: float):
    logger = get_run_logger()
    temp = fetch_weather(lat, lon)
    state = save_weather(temp, return_state=True)
    <http://logger.info|logger.info>(type(state))
    <http://logger.info|logger.info>(state.result())

    # This doesn't work:
    # if state == "Completed":
    #     <http://logger.info|logger.info>("we did it!")
    # elif state == "Failed":
    #     <http://logger.info|logger.info>("we didn't do it!")
    return state

@flow
def sub_flow_test():
    # Subflows below here:
    pipeline(38.9, -77.0)
    pipeline(38.9, 77.0)
    pipeline(-38.9, 77.0)
    pipeline(-38.9, -77.0)


if __name__ == "__main__":
    sub_flow_test()
🙌 1
j
Copy code
import httpx
import json
import numpy as np
import random

from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from typing import List, Dict

@task(retries=5, timeout_seconds=1, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_temperature(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@task()
def mean_temp(temps: List[Dict[str, float]]):
    temps = [entry['temp'] for entry in temps]
    average_temp = np.mean(temps)
    return round(average_temp, 1)

@task()
def max_temp(temps: List[Dict[str, float]]):
    temps = [entry['temp'] for entry in temps]
    average_temp = np.max(temps)
    return round(average_temp, 1)

@task()
def min_temp(temps: List[Dict[str, float]]):
    temps = [entry['temp'] for entry in temps]
    average_temp = np.min(temps)
    return round(average_temp, 1)

@flow()
def aggregate_temps(temps: List[Dict[str, float]]):
    return {
        "mean": mean_temp(temps),
        "max": max_temp(temps),
        "min": min_temp(temps),
    }

@flow()
def fetch_weather():
    logger = get_run_logger()
    temps = []

    # This will be cached
    static_lat = 38.9
    static_lon = -77.0
    temps.append({
        "lat": static_lat,
        "lon": static_lon,
        "temp": get_temperature(static_lat, static_lon)
    })

    # Then we get some random lats and lons
    for _ in range(5):
        random_lat = round(random.random() * 90 * random.choice([-1, 1]), 1)
        random_lon = round(random.random() * 180 * random.choice([-1, 1]), 1)
        <http://logger.info|logger.info>(f"Lat: {random_lat}, Long: {random_lon}")
        temps.append({
            "lat": random_lat,
            "lon": random_lon,
            "temp": get_temperature(random_lat, random_lon)
        })
        <http://logger.info|logger.info>(json.dumps(temps[-1], indent=4))
    agg_temps = aggregate_temps(temps)
    <http://logger.info|logger.info>(json.dumps(agg_temps, indent=4))

if __name__ == "__main__":
    fetch_weather()
🙌 1
j
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
from typing import List
from prefect.tasks import task_input_hash

@task(retries=3, cache_key_fn=task_input_hash)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])

    return most_recent_temp

@task(retries=3, cache_key_fn=task_input_hash)
def save_weather(temp: float, elevation: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    with open("elevation.csv", "w+") as w:
        w.write(str(elevation))

    return "Successfully wrote temp and elevation"

@task(retries=3)
def fetch_elevation(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/elevation/>"
    elevation = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon),
    )

    return float(elevation.json()["elevation"][0])

@flow(retries=3) # this is a subflow
def elev_temp_flow(lat: float, lon: float) -> List:
    temp = fetch_weather(lat,lon)
    elevation = fetch_elevation(lat,lon)
    return [elevation, temp]

@flow(retries=3)
def pipeline(lat: float, lon: float):
    elev_temp = elev_temp_flow(lat, lon)
    result = save_weather(elev_temp[1], elev_temp[0])
    print(result)

    return result

if __name__ == "__main__":
    pipeline(38.9, -77.0)
🙌 1