102 Lab: - Use a flow that grabs weather data fro...
# pacc-apr-10-11-2024
j
102 Lab: • Use a flow that grabs weather data from open-meteo • Add at least three tasks • Add retries • Run your flow • Inspect in the UI • Stretch: create an artifact • Stretch: add caching
e
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
import os
from prefect.artifacts import create_markdown_artifact
from prefect.tasks import task_input_hash
from datetime import timedelta
path=os.getcwd()


# Tasks run inside flows.
# Flows are the main unit of execution in Prefect, they can be scheduled
@task(retries=3)
def inicio():
    return 'Empezandooo'
    
@task(retries=4, retry_delay_seconds=0.5)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    report(most_recent_temp)
    



# Tasks can't run inside other tasks.
@task
def save_weather(temp: float):
    with open(os.path.join(path,"weather.csv"), "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


# Flows CAN run inside other flows.
# Docs: <https://docs.prefect.io/latest/concepts/flows/#composing-flows>
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def my_subflow():
    return "Creando informe"
@task
def report(temp):
    markdown_report = f"""# Weather Report
    
## Recent weather

| Time        | Temperature |
|:--------------|-------:|
| Temp Forecast  | {temp} |
"""
    create_markdown_artifact(
        key="weather-report",
        markdown=markdown_report,
        description="Very scientific weather report",
    )

@flow
def pipeline(lat: float = 45.9, lon: float = 3.8):
    inicio()
    temp = fetch_weather(lat, lon)
    weather = save_weather(temp)
    subflow_result = my_subflow()
    return {"weather": weather, "subflow": subflow_result}


if __name__ == "__main__":
    pipeline()
🙌 1
🦜 1