Hector GODZIEN
03/12/2024, 3:50 PMimport httpx
from prefect import task, flow, get_run_logger
from prefect.artifacts import create_markdown_artifact
from datetime import timedelta
@task()
def save_data(input_data: dict):
tmp = f"""# Let's say it's a report
Temperature {input_data.get('temperature')}
Windspeed {input_data.get('windspeed')}
"""
create_markdown_artifact(markdown=tmp)
@task(retries=3, cache_expiration=timedelta(minutes=5))
def fetch_temperature(lat: float = 88.9, lon: float =-77):
logger = get_run_logger()
temps = httpx.get(
"<https://api.open-meteo.com/v1/forecast>",
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
temps.raise_for_status()
logger.info(f"Forecast temperature: {temps.json().get('hourly').get('temperature_2m')[0]} degrees")
return temps.json().get('hourly').get('temperature_2m')[0]
@task(retries=3, cache_expiration=timedelta(minutes=5))
def fetch_windspeed(lat: float = 88.9, lon: float = -77):
logger = get_run_logger()
speeds = httpx.get("<https://api.open-meteo.com/v1/forecast>", params=dict(latitude=lat, longitude=lon, hourly="wind_speed_10m"))
speeds.raise_for_status()
logger.info(f"Forecast windspeed: {speeds.json().get('hourly').get('wind_speed_10m')[0]}")
return speeds.json().get('hourly').get('wind_speed_10m')[0]
@flow
def pipeline():
temp = fetch_temperature(lat=48)
windspeed = fetch_windspeed(lon=2)
save_data(dict(temperature=temp, windspeed=windspeed))
if __name__ == "__main__":
pipeline.serve(name="lab-102", cron="* * * * *")