Jeff Hale
06/12/2023, 5:52 PMPaul Reyna
06/12/2023, 6:00 PMimport httpx
import json
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(retries=4, cache_key_fn=task_input_hash)
def hit_meteo_api(
base_url="<https://api.open-meteo.com/v1/forecast>",
latitude: float = 0,
longitude: float = 0,
current_weather: bool = True,
):
""" """
params = {
"latitude": latitude,
"longitude": longitude,
"current_weather": current_weather,
}
r = httpx.get(base_url, params=params)
results_dict = json.loads(r.text)
print(results_dict)
return results_dict["current_weather"]
# subflow 1
@flow
def grab_data():
"""_summary_
"""
weather_1_dict = hit_meteo_api(latitude=64, longitude=64)
weather_2_dict = hit_meteo_api(latitude=-64, longitude=-64)
return weather_1_dict, weather_2_dict
@task
def aggregate_dicts(weather_dict1: dict, weather_dict2: dict):
"""_summary_
Args:
weather_dict1 (dict): _description_
weather_dict2 (dict): _description_
"""
output_list = [weather_dict1, weather_dict2]
print(output_list)
return output_list
# subflow 2
@flow
def subflow(weather_dict1, weather_dict2):
"""
"""
aggregate_dicts(weather_dict1, weather_dict2)
# main flow that runs subflow 1 + 2
@flow(log_prints=True)
def run_everything():
"""
"""
weather_1_dict, weather_2_dict = grab_data()
# aggregate_dicts(weather_1_dict, weather_2_dict)
subflow(weather_1_dict, weather_2_dict)
print("*************")
print(weather_1_dict)
print(weather_2_dict)
if __name__ == "__main__":
run_everything()
Tim Holdaway
06/12/2023, 6:00 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
import numpy as np
@task(retries=4, retry_delay_seconds=0.1)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m","precipitation","windspeed_10m"]),
)
return weather.json()
@task
def extract_temperatures(data):
return np.asarray(data["hourly"]["temperature_2m"])
@task
def extract_precipitation(data):
return np.asarray(data["hourly"]["precipitation"])
def average(data_points):
data = np.asarray(data_points)
return np.average(data)
@task
def save_weather(temp: float, precipitation: float):
with open("weather.csv", "w+") as w:
w.write(f"{str(temp)},{str(precipitation)}")
return "Successfully wrote temp"
@flow
def getAverageTemp(data):
temps = extract_temperatures(data)
print(temps)
average_temp = average(temps)
return average_temp
@flow
def getAveragePrecip(data):
precip = extract_precipitation(data)
print(precip)
average_precip = average(precip)
return average_precip
@flow(log_prints=True)
def pipeline(lat: float, lon: float):
data = fetch_weather(lat, lon)
avg_temp = getAverageTemp(data)
avg_precip = getAveragePrecip(data)
result = save_weather(avg_temp, avg_precip)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Dorian Vanunu
06/12/2023, 6:02 PMSayantani Bhattacharjee
06/12/2023, 6:09 PMimport httpx
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_weather(lat: float, lon: float):
weather = httpx.get(
"<https://api.open-meteo.com/v1/forecast/>",
params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m", "precipitation"]),
)
return weather
@flow
def get_temp(weather):
logger = get_run_logger()
<http://logger.info|logger.info>("Fetching temperature data...")
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@flow
def get_precipitation(weather):
logger = get_run_logger()
<http://logger.info|logger.info>("Fetching precipitation data...")
most_recent_precipitation = float(weather.json()["hourly"]["precipitation"][0])
return most_recent_precipitation
@flow(name="GetWeatherDataLab102", log_prints=True)
def get_all_weather_data():
fetch_api_data = fetch_weather(38.9, -79.0)
temp = get_temp(fetch_api_data)
precipitation = get_precipitation(fetch_api_data)
print(f"Temperature: {temp} \nPrecipitation: {precipitation}")
if __name__ == "__main__":
get_all_weather_data()