Jeff Hale
08/16/2023, 5:02 PMSanjeev
08/16/2023, 5:02 PMMaha
08/16/2023, 5:02 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect import get_run_logger
import csv
@task(retries=2, cache_key_fn=task_input_hash)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
if weather.status_code == 400:
raise Exception
weather = float(weather.json()["hourly"]["temperature_2m"][0])
logger = get_run_logger()
s = 'the weather is ' + str(weather)
<http://logger.info|logger.info>(s)
return weather
@task(cache_key_fn=task_input_hash)
def get_wind(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
weather = float(weather.json()["hourly"]["windspeed_10m"][0])
return weather
@task(retries=2, cache_key_fn=task_input_hash)
def save_weather(weather):
with open("weather.csv", "w+") as w:
w.write(str(weather))
return "Successfully wrote temp"
@flow(log_prints=True)
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
wind_speed = get_wind(lat, lon)
weather_report = {
1:wind_speed,
2:temp
}
result = save_weather(weather_report)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Rahul Viswanath C
08/16/2023, 5:02 PMfrom prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat , longitude=lon , hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat , longitude=lon , hourly="windspeed_10m"),
)
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
return most_recent_windspeed
@task
def save_weather(temp: float):
with open("weather.csv","w+") as w:
w.write(str(temp))
return "Successfully wrote weather"
#caching
@task(cache_key_fn=task_input_hash , cache_expiration=timedelta(minutes=3))
def save_windspeed(temp: float):
with open("windspeed.csv","w+") as w:
w.write(str(temp))
return "Successfully wrote windspeed"
#retries
@task(retries=2 , retry_delay_seconds=0.3)
def fetch():
cat_fact = httpx.get("<https://httpstat.us/Random/200,500>")
if cat_fact.status_code >= 400:
raise Exception
print(cat_fact.txt)
@flow
def pipeline(lat: float, lon: float):
logger = get_run_logger()
temp = fetch_weather(lat,lon)
weather_result = save_weather(temp)
windspeed = fetch_windspeed(lat,lon)
windspeed_result = save_windspeed(windspeed)
print(f"{weather_result} , {windspeed_result}")
#retries
fetch()
#logging
<http://logger.info|logger.info>("The weather result is " +weather_result)
<http://logger.info|logger.info>("The windspeed result is "+windspeed_result)
if __name__ == "__main__":
pipeline(38.9,-77.0)
Mike Larsson
08/16/2023, 5:02 PMAlex Rodriguez
08/16/2023, 5:03 PMfrom datetime import timedelta
import httpx
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_rain(lat: float, lon: float) -> float:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="precipitation"),
)
return float(weather.json()["hourly"]["precipitation"][0])
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_humidity(lat: float, lon: float) -> float:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
)
return float(weather.json()["hourly"]["relativehumidity_2m"][0])
@task(retries=3, retry_delay_seconds=5, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def get_temp(lat: float, lon: float) -> float:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
# raise ValueError("This is a test")
return float(weather.json()["hourly"]["temperature_2m"][0])
@task
def print_results(temp: float, humidity: float, rain: float) -> None:
print(f"Temperature : {temp}, Relative Humitidy : {humidity}, Rain : {rain}%")
@flow
def test_flow(lat: float, lon: float):
temp = get_temp(lat, lon)
humidity = get_humidity(lat, lon)
rain = get_rain(lat, lon)
print_results(temp, humidity, rain)
if __name__ == "__main__":
test_flow(2, 3)
Abhilash Agarwal
08/16/2023, 5:03 PMsfrasier
08/16/2023, 5:06 PM