Kalo
05/31/2023, 1:57 PMKalo
05/31/2023, 1:58 PMfrom pathlib import Path
import httpx
from prefect import flow, task
base_url = '<https://api.open-meteo.com/v1/>'
@task
def fetch_weather(lat: float, lon: float):
weather = httpx.get(
f'{base_url}/forecast',
params=dict(
latitude=float(lat),
longitude=float(lon),
hourly="temperature_2m",
)
)
return weather.json()["hourly"]["temperature_2m"][0]
@task
def fetch_soil_data(lat, lon):
response = httpx.get(
f'{base_url}/forecast',
params=dict(
latitude=lat,
longitude=lon,
hourly='soil_temperature_0cm'
)
)
print(response.status_code, response.content)
return response.json()["hourly"]["soil_temperature_0cm"][0]
@task
def save_weather(weather_data, soil_data):
with open(Path('output') / Path('weather.csv'), 'a+') as _outfile:
_outfile.write(str(weather_data))
_outfile.write(str(','))
_outfile.write(str(soil_data))
_outfile.write(str('\n'))
@flow
def get_weather_flow(lat, lon):
weather_data = fetch_weather(lat, lon)
soil_data = fetch_soil_data(lat, lon)
save_weather(weather_data, soil_data)
if __name__ == '__main__':
get_weather_flow(52.52, 13.41)
Kalo
05/31/2023, 1:58 PM