Amir Volfovich
05/31/2023, 2:09 PMfrom enum import Enum, auto
import httpx # requests capability, but can work with async
from prefect import flow, task
class TimeResolution(Enum):
hourly = auto()
daily = auto()
@task
def fetch_weather_hourly_param(lat: float, lon: float, resolution: TimeResolution, param: str) -> dict:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params={
"latitude": lat,
"longitude": lon,
resolution.name: param
}
)
return weather.json()
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def extract_data_point(response: dict, resolution: TimeResolution, param: str, hour: int) -> float:
return float(response[resolution.name][param][hour])
@task
def print_float_param(data: float, resolution: TimeResolution, param: str):
print(f"for param {param}, in resoltion {resolution.name} data is {data}")
@flow
def pipeline(lat: float, lon: float, resolution: TimeResolution, item_pos: int, params: list):
params_jsons = fetch_weather_hourly_param.map(lat, lon, resolution, params)
float_data = extract_data_point.map(params_jsons, resolution, params, item_pos)
return print_float_param.map(float_data, resolution, params)
if __name__ == "__main__":
params = ["temperature_2m", "relativehumidity_2m", "dewpoint_2m"]
pipeline(38.9, -77.0, TimeResolution.hourly, 0, params)