Jeff Hale
04/10/2024, 5:36 PMEstibaliz Echevarria
04/10/2024, 6:05 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
import os
from prefect.artifacts import create_markdown_artifact
from prefect.tasks import task_input_hash
from datetime import timedelta
path=os.getcwd()
# Tasks run inside flows.
# Flows are the main unit of execution in Prefect, they can be scheduled
@task(retries=3)
def inicio():
return 'Empezandooo'
@task(retries=4, retry_delay_seconds=0.5)
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
report(most_recent_temp)
# Tasks can't run inside other tasks.
@task
def save_weather(temp: float):
with open(os.path.join(path,"weather.csv"), "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
# Flows CAN run inside other flows.
# Docs: <https://docs.prefect.io/latest/concepts/flows/#composing-flows>
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def my_subflow():
return "Creando informe"
@task
def report(temp):
markdown_report = f"""# Weather Report
## Recent weather
| Time | Temperature |
|:--------------|-------:|
| Temp Forecast | {temp} |
"""
create_markdown_artifact(
key="weather-report",
markdown=markdown_report,
description="Very scientific weather report",
)
@flow
def pipeline(lat: float = 45.9, lon: float = 3.8):
inicio()
temp = fetch_weather(lat, lon)
weather = save_weather(temp)
subflow_result = my_subflow()
return {"weather": weather, "subflow": subflow_result}
if __name__ == "__main__":
pipeline()