Jeff Hale
06/14/2023, 3:58 PMbrittany bennett
06/14/2023, 4:08 PMfrom prefect import flow, task
import requests as re
url = '<https://api.open-meteo.com/v1/forecast>'
@task
def fetch_weather_data(url, params):
response = re.get(url, params)
return response
@flow
def main():
# Get some temperature data for a place @ 13.41 Lon 52.52 Lat
temperature_params = {
'longitude' : 13.41,
'latitude' : 52.52
}
temperature_data = fetch_weather_data(url, temperature_params)
# <https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41>
print(temperature_datsa.text)
Carlos Baldellou
06/14/2023, 4:11 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def save_weather(temp: float, elevation: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
with open("elevation.csv", "w+") as w:
w.write(str(elevation))
return "Successfully wrote temp and elevation"
@task
def fetch_elevation(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/elevation/>"
elevation = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon),
)
elevation = float(elevation.json()["elevation"][0])
return elevation
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
elevation = fetch_elevation(lat,lon)
result = save_weather(temp,elevation)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
brittany bennett
06/14/2023, 4:15 PMfrom prefect import flow, task
import requests as re
import json
url = '<https://api.open-meteo.com/v1/forecast>'
@task
def fetch_weather_data(url, params):
response = re.get(url, params)
return response
@task
def get_elevation(json_object):
elevation = json.loads(json_object.text)['elevation']
return elevation
@flow
def main():
# Get some temperature data for a place @ 13.41 Lon 52.52 Lat
temperature_params = {
'longitude' : 13.41,
'latitude' : 52.52
}
temperature_data = fetch_weather_data(url, temperature_params)
# <https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41>
print(temperature_data.text)
yemen_elevation = get_elevation(temperature_data)
print(f"The elevation off the coast of Yemen is {yemen_elevation}")
if __name__ == '__main__':
# do stuff
print("We are going to fetch some weather data")
main()
print("Have a nice day (: ")
Cody
06/14/2023, 4:16 PMfrom prefect import task, flow
import requests
@task
def get_data():
res = requests.get("<https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41&hourly=temperature_2m>")
print('ran get_data')
return res.json()
@task
def parse_data(data):
lat = data['latitude']
lon = data['longitude']
elev = data['elevation']
out = {"lat": lat,
"lon": lon,
"elev": elev}
print('parsed data')
return out
@task
def print_data(data):
print('printing data')
print(data)
@flow
def my_flow():
data = get_data()
parsed_data = parse_data(data)
print_data(parsed_data)
if __name__ == "__main__":
my_flow()
Joey Allison
06/14/2023, 4:16 PMimport httpx
from prefect import flow, task
@task
def fetch_temp(lat: float, lon: float) -> float:
metric = {"daily": "temperature_2m_max", "timezone": "UTC"}
return fetch_weather_from_api(lat, lon, metric)
@task
def fetch_humidity(lat: float, lon: float) -> float:
metric = {"hourly": "relativehumidity_2m"}
return fetch_weather_from_api(lat, lon, metric)
@task
def fetch_dewpoint(lat: float, lon: float) -> float:
metric = {"hourly": "dewpoint_2m"}
return fetch_weather_from_api(lat, lon, metric)
def fetch_weather_from_api(lat: float, lon: float, metric: str) -> float:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, **metric),
)
print(weather)
return float(weather.json()[list(metric.keys())[0]][list(metric.values())[0]][0])
@flow
def fetch_weather(lat: float, lon: float):
temp = fetch_temp(lat, lon)
humidity = fetch_humidity(lat, lon)
dewpoint = fetch_dewpoint(lat, lon)
print(f"Temp (Max Daily for LA): {temp}")
print(f"Humidity: {humidity}")
print(f"Dewpoint: {dewpoint}")
if __name__ == "__main__":
fetch_weather(38.9, -77.0)
Jeff Moszuti
06/14/2023, 4:17 PMimport httpx
from prefect import task, flow
@task
def fetch_weather(lat: float, lon:float):
base_url = "<https://api.open-meteo.com/v1/forecast>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m")
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
# fetch_weather(51.5072,0.1276)
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon:float):
temp = fetch_weather(lat, lon)
result = save_weather(temp)
return result
if __name__ == "__main__":
pipeline(51.5072,0.1276)
Santhosh Solomon (Fluffy)
06/14/2023, 4:32 PMfrom prefect import flow, task
from requests import get
@task(name='air-quality-data')
def get_air_quality_information(latitude: str, longitude: str) -> list:
api_response = get(f'<https://air-quality-api.open-meteo.com/v1/air-quality?latitude={latitude}&longitude={longitude}&hourly=pm10,pm2_5>')
air_quality_data = []
if api_response.status_code == 200:
time_data = api_response.json().get('hourly').get('time')
pm10 = api_response.json().get('hourly').get('pm_10')
pm2_5 = api_response.json().get('hourly').get('pm2_5')
for idx, data in enumerate(time_data):
air_quality_data.append(
{
'hour': data,
'pm10': pm10[idx],
'pm2_5': pm2_5[idx]
}
)
return air_quality_data
@task(name='co-ordinates-api')
def get_co_ordinates(city_name: str) -> tuple | None:
api_response = get(f'<https://geocoding-api.open-meteo.com/v1/search?name={city_name}&count=10&language=en&format=json>')
if api_response.status_code == 200:
city = api_response.json().get('results')[0]
latitude = city.get('latitude')
longitude = city.get('longitude')
return (latitude, longitude)
else:
return None
@flow(name='parent workflow')
def get_meteo_data(city: str):
co_ordinates = get_co_ordinates(city)
air_quality_data = get_air_quality_information(co_ordinates[0], co_ordinates[1])
print(air_quality_data)
if __name__ == '__main__':
get_meteo_data('Berlin')