Jeff Hale
06/13/2023, 4:56 PMSayantani Bhattacharjee
06/13/2023, 5:05 PMimport httpx
from datetime import timedelta
from prefect import flow, task, get_run_logger, variables
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact
@task
def mark_it_down(temp):
markdown_report = f"""# Weather Report
## Recent weather
| Time | Revenue |
|:--------------|-------:|
| Now | {temp} |
| In 1 hour | {temp + 2} |
| Variable |{variables.get('precipitation')}
"""
create_markdown_artifact(
key="weather-report",
markdown=markdown_report,
description="Very scientific weather report",
)
@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_weather(lat: float, lon: float):
weather = httpx.get(
"<https://api.open-meteo.com/v1/forecast/>",
params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m", "precipitation"]),
)
return weather
@flow
def get_temp(weather):
logger = get_run_logger()
<http://logger.info|logger.info>("Fetching temperature data...")
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@flow
def get_precipitation(weather):
logger = get_run_logger()
<http://logger.info|logger.info>("Fetching precipitation data...")
most_recent_precipitation = float(weather.json()["hourly"]["precipitation"][0])
return most_recent_precipitation
@flow(name="WeatherDataLab105", log_prints=True)
def weather_data(lat: float = 38.9, lon: float = -79.0):
fetch_api_data = fetch_weather(lat, lon)
temp = get_temp(fetch_api_data)
precipitation = get_precipitation(fetch_api_data)
mark_it_down(temp)
print(f"Temperature: {temp} \nPrecipitation: {precipitation}")
if __name__ == "__main__":
weather_data()