Jeff Hale
07/11/2023, 2:52 PMLudwig Schmidt-Hackenberg
07/11/2023, 2:57 PMJakub Dvorak
07/11/2023, 3:01 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(retries=2, retry_delay_seconds = 5)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
print(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task(retries=2, retry_delay_seconds = 5)
def fetch_rain(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
rain_forecast = float(weather.json()["hourly"]["rain"][0])
print(f"Recent rain forecast: {rain_forecast}")
return rain_forecast
@task(retries=2, retry_delay_seconds = 5)
def fetch_showers(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="showers"),
)
showers_forecast = float(weather.json()["hourly"]["showers"][0])
print(f"Recent showers: {showers_forecast}")
return showers_forecast
@task(cache_key_fn=task_input_hash, retries=2, retry_delay_seconds = 5)
def save_weather(temp: float, rain: float, showers: float):
with open("weather.csv", "w+") as w:
w.write("Temperature: " + str(temp) + "\n")
w.write("Rain: " + str(rain) + "\n")
w.write("Showers: " + str(showers) + "\n")
return "Successfully wrote data"
@flow(log_prints=True)
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
rain = fetch_rain(lat, lon)
showers = fetch_showers(lat, lon)
result = save_weather(temp, rain, showers)
return result
if __name__ == "__main__":
pipeline(49.68720747651769, 18.384079000116053)
János
07/11/2023, 3:01 PMBlake Stefansen
07/11/2023, 3:03 PMimport httpx
from prefect import flow, task, get_run_logger
from prefect.logging.loggers import task_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=2)
def get_temp(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
temp = weather.json()["hourly"]["temperature_2m"][0]
logger = get_run_logger()
<http://logger.info|logger.info>(f"Most recent air temperature at 2m above ground: {temp} C")
return temp
@task(retries=2, cache_key_fn=task_input_hash, cache_expiration=timedelta(seconds=5))
def get_wind(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
temp = weather.json()["hourly"]["windspeed_10m"][0]
logger = get_run_logger()
<http://logger.info|logger.info>(f"Most recent wind speed: {temp} km/h")
print("test")
return temp
@task
def save_weather(temp: float, wind: float):
with open("weather.csv", "w+") as w:
w.write("Temperature: " + str(temp) + "\n")
w.write("Wind: " + str(wind) + "\n")
return "Successfully wrote data"
@flow(log_prints=True)
def test_flow():
temp = get_temp(52.52, 13.41)
wind = get_wind(52.52, 13.41)
wind = get_wind(52.52, 13.41)
save_weather(temp, wind)
if __name__ == "__main__":
test_flow()
Andrey Lukashevich
07/11/2023, 3:05 PM# Write flow code that fetches other weather metrics
# Make at least 3 tasks in the flow
# Example: windspeed for the last hour:
# weather.json()["hourly"]["windspeed_10m"][0]
# Docs: <http://open-meteo.com/en/docs|open-meteo.com/en/docs>
import httpx
import random
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_api_url(logger):
url = "<https://api.open-meteo.com/v1/forecast>"
logger.debug(f"API url is: {url}")
return url
@task(
retries=2,
retry_delay_seconds=1,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def get_api_request(logger, api_url, params):
random_num = random.randint(1, 2)
<http://logger.info|logger.info>(f"Random number (for failures simulation): {random_num}")
if random_num == 1:
logger.debug("Going to request to the API...")
res = httpx.request(
method="get",
url=api_url,
params=params
)
return res
else:
err_msg = "Simulation of failure"
logger.error(err_msg)
raise Exception(err_msg)
@task(retries=5, retry_delay_seconds=1)
def parse_result(logger, res):
logger.debug(f"API result: {res}")
if res.status_code == 200:
<http://logger.info|logger.info>("Res code 200 OK")
return res.json()
else:
logger.error(f"Res code isn't 200 - we got {res.status_code}")
raise Exception("API returned wrong result")
@flow(
retries=4,
retry_delay_seconds=5,
name="lab102-flow"
)
def get_recent_windspeed_10m(lat, lon):
logger = get_run_logger()
<http://logger.info|logger.info>("Running get_api_url")
api_url = get_api_url(logger)
<http://logger.info|logger.info>("Running get_api_request")
res = get_api_request(logger, api_url , dict(latitude=lat, longitude=lon, hourly="windspeed_10m"))
<http://logger.info|logger.info>("Running parse_result")
res_json = parse_result(logger, res)
return res_json["hourly"]["windspeed_10m"][-1]
if __name__ == "__main__":
miami = {"lat": 25.761681, "lon": -80.191788}
miami_windspeed = get_recent_windspeed_10m(**miami)
print(miami_windspeed)
Paul Weinmann
07/11/2023, 3:07 PMfrom pathlib import Path
from prefect import flow, get_run_logger
import httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_precipitation(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather_feature = "precipitation_probability"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=weather_feature),
)
most_recent_prec_prob = float(weather.json()["hourly"][weather_feature][0])
return most_recent_prec_prob
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def save_precipitation(pp: float):
with open("precipitation_probability.csv", "w+") as w:
w.write(str(pp))
return "Successfully wrote precipitation probability"
@task
def fetch_hourly_weather_feature(lat: float, lon: float, weather_feature: str):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly=weather_feature),
)
most_recent_prec_prob = float(weather.json()["hourly"][weather_feature][0])
return most_recent_prec_prob
@task
def calc_sunscreen_level(uv_index: float):
sunscreen_level = 10 * (1 + uv_index) ** 2
return sunscreen_level
@task
def save_sunscreen_level(sunscreen_level: float):
with open("sunscreen_level.csv", "w+") as w:
w.write(str(sunscreen_level))
return "Successfully wrote precipitation probability"
@flow(log_prints=True)
def pipeline(lat: float, lon: float):
uv_index = fetch_hourly_weather_feature(lat, lon, weather_feature="uv_index")
sunscreen_level = calc_sunscreen_level(uv_index)
result = save_sunscreen_level(sunscreen_level)
return result
@flow(
name="log-example-flow",
retries=3,
log_prints=True,
cache_result_in_memory=True,
persist_result=True,
)
def log_it():
logger = get_run_logger()
<http://logger.info|logger.info>("INFO level log message.")
logger.debug("You only see this message if the logging level is set to DEBUG. 🙂")
pipeline(38.9, -77.0)
if __name__ == "__main__":
log_it()
Vinayak Nair
07/11/2023, 3:15 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
from datetime import timedelta
from prefect.tasks import task_input_hash
@task(retries=3, retry_delay_seconds=2)
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
windspeed = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_windspeed = float(windspeed.json()["hourly"]["windspeed_10m"][0])
return most_recent_windspeed
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_relative_humidity(lat:float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
relative_humidity = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
)
most_recent_relative_humidity = float(relative_humidity.json()["hourly"]["relativehumidity_2m"][0])
return most_recent_relative_humidity
@task(cache_key_fn=task_input_hash, retries=2, retry_delay_seconds = 5)
def fetch_rain(lat:float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
rain = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="rain"),
)
most_recent_rain = float(rain.json()["hourly"]["rain"][0])
return most_recent_rain
@task(retries=4, retry_delay_seconds=0.1)
def save_data(windspeed: float, relative_humidity: float, rain:float):
with open("weather_deets.csv", "w+") as w:
w.write(f"{windspeed},{relative_humidity}, {rain}")
return "Successfully wrote data"
@flow(
name="meteo-data",
retries=3,
log_prints=True,
cache_result_in_memory=True,
persist_result=True,
)
def pipeline(lat: float, lon: float):
windspeed = fetch_windspeed(lat, lon)
relative_humidity = fetch_relative_humidity(lat, lon)
rain = fetch_rain(lat, lon)
result = save_data(windspeed, relative_humidity, rain)
return result
if __name__ == "__main__":
pipeline(52.5244, 13.4105)
Derick
07/11/2023, 3:28 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
from prefect import flow, get_run_logger
from prefect import flow, get_run_logger
# @task
# def fetch_weather(lat: float, lon: float):
# base_url = "<https://api.open-meteo.com/v1/forecast/>"
# weather = httpx.get(
# base_url,
# params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
# )
# most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
# return most_recent_temp
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1),retries=4, retry_delay_seconds=0.1)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][-1])
return most_recent_windspeed
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1),retries=4, retry_delay_seconds=0.1)
def save_weather(windspeed: float):
with open("weather.csv", "w+") as w:
w.write(str(windspeed))
return "Successfully wrote temp"
@flow(name="dericks-test-flow")
def pipeline(lat: float, lon: float):
windspeed = fetch_weather(lat, lon)
result = save_weather(windspeed)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)