Emil Christensen
08/31/2023, 12:41 AMElizabeth Bing
08/31/2023, 12:41 AMEmil Christensen
08/31/2023, 12:44 AMdocker run -it --rm -v $PWD:/opt/prefect/flows/ prefecthq/prefect:2-latest bash
Elizabeth Bing
08/31/2023, 12:47 AMimport httpx
from prefect import flow, task
import pandas as pd
from prefect.tasks import task_input_hash
# latitude 26.1 North, longitude 84.8 West
# TASKS
# cache_key determines the inputs
@task(cache_key_fn=task_input_hash, cache_expiration=datetime.timedeltas=)
def fetch_weather(lat: float, lon: float):
base_url = "https://api.open-meteo.com/v1/forecast/"
weather = httpx.get(
base_url,
params=dict(
latitude=lat,
longitude=lon,
hourly="temperature_2m,cloudcover_mid,windspeed_10m,winddirection_10m",
),
)
weather_json = weather.json()
df = pd.DataFrame(
{
"latitude": weather_json["latitude"],
"longitude": weather_json["longitude"],
"temperature": weather.json()["hourly"]["temperature_2m"],
"wind": weather.json()["hourly"]["windspeed_10m"],
"cloudcover": weather.json()["hourly"]["cloudcover_mid"],
}
)
return df
@task
def save_weather_data(data: pd.DataFrame):
data.to_csv("weather_data.csv")
return "successfully wrote file"
@task
def get_latest_value(data: pd.DataFrame, metric: str):
print(f"latest data for {metric} {data[metric].iloc[-1]}")
# FLOW
# Determines the order in which tasks are called
@flow(retries=4)
def get_weather_metrics(lat: float, lon: float):
df = fetch_weather(lat=lat, lon=lon)
# print out latest wind metric
get_latest_value(df, "wind")
save_weather_data(df)
# Start the flow
if name == "__main__":
get_weather_metrics(38.9, -77.0)Emil Christensen
08/31/2023, 12:55 AM@task(cache_key_fn=task_input_hash, cache_expiration=datetime.timedeltas=)
Elizabeth Bing
08/31/2023, 1:05 AMEmil Christensen
08/31/2023, 1:24 AMprefect server start
prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
Elizabeth Bing
08/31/2023, 3:38 AM