Charlie Henry
06/14/2023, 5:13 PMCharlie Henry
06/14/2023, 5:13 PM# x Logging
# - States
# x Retries for tasks and flows
# x API server
# x UI
# - Results
# x Caching
# x Subflows
import httpx # requests capability, but can work with async
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task(
log_prints=True,
retries=4,
retry_delay_seconds=1,
)
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
print("Successfully wrote temp")
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon: float):
logger = get_run_logger()
temp = fetch_weather(lat, lon)
state = save_weather(temp, return_state=True)
<http://logger.info|logger.info>(type(state))
<http://logger.info|logger.info>(state.result())
# This doesn't work:
# if state == "Completed":
# <http://logger.info|logger.info>("we did it!")
# elif state == "Failed":
# <http://logger.info|logger.info>("we didn't do it!")
return state
@flow
def sub_flow_test():
# Subflows below here:
pipeline(38.9, -77.0)
pipeline(38.9, 77.0)
pipeline(-38.9, 77.0)
pipeline(-38.9, -77.0)
if __name__ == "__main__":
sub_flow_test()
Joey Allison
06/14/2023, 5:14 PMimport httpx
import json
import numpy as np
import random
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from typing import List, Dict
@task(retries=5, timeout_seconds=1, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_temperature(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
print(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task()
def mean_temp(temps: List[Dict[str, float]]):
temps = [entry['temp'] for entry in temps]
average_temp = np.mean(temps)
return round(average_temp, 1)
@task()
def max_temp(temps: List[Dict[str, float]]):
temps = [entry['temp'] for entry in temps]
average_temp = np.max(temps)
return round(average_temp, 1)
@task()
def min_temp(temps: List[Dict[str, float]]):
temps = [entry['temp'] for entry in temps]
average_temp = np.min(temps)
return round(average_temp, 1)
@flow()
def aggregate_temps(temps: List[Dict[str, float]]):
return {
"mean": mean_temp(temps),
"max": max_temp(temps),
"min": min_temp(temps),
}
@flow()
def fetch_weather():
logger = get_run_logger()
temps = []
# This will be cached
static_lat = 38.9
static_lon = -77.0
temps.append({
"lat": static_lat,
"lon": static_lon,
"temp": get_temperature(static_lat, static_lon)
})
# Then we get some random lats and lons
for _ in range(5):
random_lat = round(random.random() * 90 * random.choice([-1, 1]), 1)
random_lon = round(random.random() * 180 * random.choice([-1, 1]), 1)
<http://logger.info|logger.info>(f"Lat: {random_lat}, Long: {random_lon}")
temps.append({
"lat": random_lat,
"lon": random_lon,
"temp": get_temperature(random_lat, random_lon)
})
<http://logger.info|logger.info>(json.dumps(temps[-1], indent=4))
agg_temps = aggregate_temps(temps)
<http://logger.info|logger.info>(json.dumps(agg_temps, indent=4))
if __name__ == "__main__":
fetch_weather()
Jennifer Lauv
06/14/2023, 5:16 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
from typing import List
from prefect.tasks import task_input_hash
@task(retries=3, cache_key_fn=task_input_hash)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task(retries=3, cache_key_fn=task_input_hash)
def save_weather(temp: float, elevation: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
with open("elevation.csv", "w+") as w:
w.write(str(elevation))
return "Successfully wrote temp and elevation"
@task(retries=3)
def fetch_elevation(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/elevation/>"
elevation = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon),
)
return float(elevation.json()["elevation"][0])
@flow(retries=3) # this is a subflow
def elev_temp_flow(lat: float, lon: float) -> List:
temp = fetch_weather(lat,lon)
elevation = fetch_elevation(lat,lon)
return [elevation, temp]
@flow(retries=3)
def pipeline(lat: float, lon: float):
elev_temp = elev_temp_flow(lat, lon)
result = save_weather(elev_temp[1], elev_temp[0])
print(result)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)