Jeff Hale
05/31/2023, 2:27 PMKalo
05/31/2023, 2:30 PMimport csv
import random
from pathlib import Path
import httpx
from prefect import flow, task
base_url = '<https://api.open-meteo.com/v1/>'
@task(retries=4, persist_result=True, retry_delay_seconds=0.3)
def fetch_weather(lat: float, lon: float):
weather = httpx.get(
f'{base_url}/forecast',
params=dict(
latitude=float(lat),
longitude=float(lon),
hourly="temperature_2m",
)
)
if random.randint(0, 10) % 2:
1 / 0
return weather.json()["hourly"]["temperature_2m"][0]
@task
def fetch_soil_data(lat, lon):
response = httpx.get(
f'{base_url}/forecast',
params=dict(
latitude=lat,
longitude=lon,
hourly='soil_temperature_0cm'
)
)
return response.json()["hourly"]["soil_temperature_0cm"][0]
@flow
def transform_weather_data(weather, soil):
return f'{weather}-{soil}'
@task
def save_weather(to_write):
dest = Path('output') / Path('weather.csv')
with open(dest, 'a+') as _outfile:
_writer = csv.writer(_outfile, delimiter=';', quotechar="\"")
_writer.writerow([to_write])
@flow
def get_weather_flow(lat, lon):
weather_data = fetch_weather(lat, lon)
soil_data = fetch_soil_data(lat, lon)
to_write = transform_weather_data(weather_data, soil_data)
save_weather(to_write)
if __name__ == '__main__':
get_weather_flow(52.52, 13.41)
Emiliano
05/31/2023, 2:32 PMimport httpx
from prefect import flow, task, get_run_logger
def fetch_api(lat: float, lon: float, hourly: str):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=hourly),
)
most_recent_data = float(weather.json()["hourly"][hourly][0])
print(f"Most recent {hourly}: {most_recent_data}")
return most_recent_data
@task(retries=3, retry_delay_seconds=0.1)
def fetch_weather(lat: float, lon: float):
results = fetch_api(lat, lon, "temperature_2m")
return results
@task(retries=3, retry_delay_seconds=0.1)
def fetch_windspeed(lat: float, lon: float):
results = fetch_api(lat, lon, "windspeed_10m")
return results
@task(retries=3, retry_delay_seconds=0.1)
def fetch_rain(lat: float, lon: float):
results = fetch_api(lat, lon, "rain")
return results
@flow()
def pipeline(lat: float, lon: float):
latest_temp = fetch_weather(lat, lon)
latest_windspeed = fetch_windspeed(lat, lon)
latest_rain = fetch_rain(lat, lon)
return latest_temp, latest_windspeed, latest_rain
@flow()
def log_data_fetched(lat: float, lon: float):
logger = get_run_logger()
data = pipeline(lat, lon)
for metric in data:
<http://logger.info|logger.info>(f'Data fetched from the API: {metric}')
if __name__ == "__main__":
log_data_fetched(lat=10, lon=3.5)
Sam Cook
05/31/2023, 2:33 PMimport httpx # requests capability, but can work with async
from prefect import flow, task, get_run_logger
base_url = "<https://api.open-meteo.com/v1/forecast/>"
@task(retries=3, retry_delay_seconds=0.1)
def get_temps(lat:float, lon:float):
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="apparent_temperature"),
)
return weather.json()["hourly"]["apparent_temperature"]
@task
def average(vals:list[str]):
return sum([float(v) for v in vals]) / len(vals)
@flow
def average_temp(lat:float, lon:float):
temps = get_temps(lat, lon)
avg = average(temps)
return avg
@flow
def pipeline(lat: float, lon: float):
result = average_temp(lat, lon)
get_run_logger().info(f"Average apparent temp is {result}")
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Li McCarthy
05/31/2023, 2:48 PMTim Lin
05/31/2023, 2:49 PMimport httpx
from prefect import flow, task
import statistics
@task()
def fetch_air_quality(lat, lon):
base_url = "https://air-quality-api.open-meteo.com/v1/air-quality"
air_quality = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="carbon_monoxide,dust")
)
return(air_quality.json())
@flow(retries=2)
def air_quality_flow(lat: float, lon: float):
air_quality_data = fetch_air_quality(lat, lon)
air_quality_stats(air_quality_data)
return None
@flow(log_prints=True)
def air_quality_stats(data):
cm_mean = statistics.mean(data['hourly']['carbon_monoxide'])
print(f"Mean Carbon Monoxide over L24h: {cm_mean}μg/m³")
if name == "__main__":
air_quality_flow(51.51, -0.13)Li McCarthy
05/31/2023, 2:49 PMTim Lin
05/31/2023, 2:50 PMimport httpx
from prefect import flow, task
import statistics
@task()
def fetch_air_quality(lat, lon):
base_url = "<https://air-quality-api.open-meteo.com/v1/air-quality>"
air_quality = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="carbon_monoxide,dust")
)
return(air_quality.json())
@flow(retries=2)
def air_quality_flow(lat: float, lon: float):
air_quality_data = fetch_air_quality(lat, lon)
air_quality_stats(air_quality_data)
return None
@flow(log_prints=True)
def air_quality_stats(data):
cm_mean = statistics.mean(data['hourly']['carbon_monoxide'])
print(f"Mean Carbon Monoxide over L24h: {cm_mean}μg/m³")
if __name__ == "__main__":
air_quality_flow(51.51, -0.13)
Tim Lin
05/31/2023, 2:50 PM