Jeff Hale
06/12/2023, 4:41 PMPaul Reyna
06/12/2023, 4:56 PMimport httpx
import json
from prefect import flow, task
@task
def hit_meteo_api(
base_url="<https://api.open-meteo.com/v1/forecast>",
latitude: float = 0,
longitude: float = 0,
current_weather: bool = True,
):
""" """
params = {
"latitude": latitude,
"longitude": longitude,
"current_weather": current_weather,
}
r = httpx.get(base_url, params=params)
results_dict = json.loads(r.text)
print(results_dict)
return results_dict["current_weather"]
@task
def get_temp(weather_dict: dict):
""" """
temperature = weather_dict["temperature"]
return temperature
@task
def write_to_file(temperature: float):
"""_summary_"""
with open("lab_101_temperature.txt", "w") as f:
f.write(str(temperature))
print("we did it!")
@flow
def run_everything():
""" """
results_dict = hit_meteo_api()
temp = get_temp(results_dict)
write_to_file(temp)
if __name__ == "__main__":
run_everything()
Sayantani Bhattacharjee
06/12/2023, 5:01 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
BASE_URL = "<https://api.open-meteo.com/v1/forecast/>"
@task
def fetch_weather(lat: float, lon: float):
weather = httpx.get(
BASE_URL,
params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m", "precipitation"]),
)
return weather
@task
def get_weather_data(weather):
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
most_recent_precipitation = float(weather.json()["hourly"]["precipitation"][0])
return most_recent_temp, most_recent_precipitation
@task
def save_weather(temp: float, precipitation: float):
with open("weather.csv", "w+") as w:
weather_output = f"{str(temp)}, {str(precipitation)}"
w.write(weather_output)
return "Successfully wrote temp"
@flow(log_prints=True)
def pipeline(lat: float, lon: float):
weather_data = fetch_weather(lat, lon)
temp_metrics, precipitation_metrics = get_weather_data(weather_data)
print(temp_metrics, precipitation_metrics)
result = save_weather(temp_metrics, precipitation_metrics)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Lucas Muzynoski
06/12/2023, 5:02 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_temperature(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def save_weather(temp: float,windspeed:float):
with open("weather.csv", "w+") as w:
w.writelines(['temp is '+str(temp),'windspeed is ' +str(windspeed)])
#w.writelines('windspeed is ' +str(windspeed))
return "Successfully wrote weather"
@task
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
return most_recent_windspeed
@flow
def pipeline(lat: float, lon: float):
temp = fetch_temperature(lat, lon)
windspeed=fetch_windspeed(lat,lon)
result = save_weather(temp,windspeed)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Mark McDonald
06/12/2023, 5:03 PMimport json
from prefect import flow, task
import httpx
@task
def fetch_weather(lat: float, lon: float) -> dict:
base_url = "<https://api.open-meteo.com/v1/forecast>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m")
)
json_payload = weather.json()
return json_payload
@task
def print_data(data: dict):
print(data)
return data
@task
def save_data(data: dict):
json_object = json.dumps(data, indent=4)
with open("data.json", "w") as f:
f.write(json_object)
return data
@flow
def get_data():
data = fetch_weather(52.52, 13.40)
print_data(data)
save_data(data)
if __name__ == "__main__":
get_data()
Dorian Vanunu
06/12/2023, 5:03 PM