Bianca Hoch
12/17/2024, 3:24 PMDavid Michael Gang
12/17/2024, 3:34 PMimport httpx
import os
from dataclasses import dataclass
from typing import List
from prefect import flow, task
@dataclass
class Location:
lat: float
lon: float
# Get API key from environment variables
api_key = os.environ.get('WEATHER_API_KEY')
if not api_key:
raise ValueError("WEATHER_API_KEY environment variable is not set")
client = httpx.Client(
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
)
@task
def fetch_weather(location: Location):
base_url = "<https://api.open-meteo.com/v1/forecast>"
response = client.get(
base_url,
params={
"latitude": location.lat,
"longitude": location.lon,
"hourly": "temperature_2m"
}
)
response.raise_for_status()
forecasted_temp = float(response.json()["hourly"]["temperature_2m"][0])
print(f"Forecasted temp C: {forecasted_temp} degrees for location {location.lat}, {location.lon}")
return location, forecasted_temp
@task
def save_weather(location: Location, temp: float):
filename = f"weather_{location.lat}_{location.lon}.csv"
with open(filename, "w+") as w:
w.write(str(temp))
return f"Successfully wrote temp for location {location.lat}, {location.lon}"
@flow(retries=3, log_prints=True)
def pipeline(locations: List[Location]):
for location in locations:
location, temp = fetch_weather(location)
result = save_weather(location, temp)
print(result)
return "Completed processing all locations"
if __name__ == "__main__":
locations = [
Location(lat=38.9, lon=-77.0), # Washington DC
Location(lat=40.7, lon=-74.0), # New York
Location(lat=34.0, lon=-118.2), # Los Angeles
]
try:
pipeline(locations)
finally:
client.close()
This works but in the deploy stage i need to have some value of WEATHER_API_KEY
This is more natural but doesn't work
import httpx
import os
from dataclasses import dataclass
from typing import List
from prefect import flow, task
@dataclass
class Location:
lat: float
lon: float
@task
def fetch_weather(client: httpx.Client, location: Location):
base_url = "<https://api.open-meteo.com/v1/forecast>"
response = client.get(
base_url,
params={
"latitude": location.lat,
"longitude": location.lon,
"hourly": "temperature_2m"
}
)
response.raise_for_status()
forecasted_temp = float(response.json()["hourly"]["temperature_2m"][0])
print(f"Forecasted temp C: {forecasted_temp} degrees for location {location.lat}, {location.lon}")
return location, forecasted_temp
@task
def save_weather(location: Location, temp: float):
filename = f"weather_{location.lat}_{location.lon}.csv"
with open(filename, "w+") as w:
w.write(str(temp))
return f"Successfully wrote temp for location {location.lat}, {location.lon}"
@flow(retries=3, log_prints=True)
def pipeline(locations: List[Location]):
api_key = os.environ.get('WEATHER_API_KEY')
if not api_key:
raise ValueError("WEATHER_API_KEY environment variable is not set")
with httpx.Client(
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
) as client:
for location in locations:
location, temp = fetch_weather(client, location)
result = save_weather(location, temp)
print(result)
return "Completed processing all locations"
if __name__ == "__main__":
locations = [
Location(lat=38.9, lon=-77.0), # Washington DC
Location(lat=40.7, lon=-74.0), # New York
Location(lat=34.0, lon=-118.2), # Los Angeles
]
pipeline(locations)
David Michael Gang
12/18/2024, 3:36 PMBianca Hoch
12/18/2024, 3:55 PMDavid Michael Gang
12/18/2024, 6:17 PMDavid Michael Gang
12/18/2024, 6:17 PMDavid Michael Gang
12/18/2024, 6:21 PMBianca Hoch
12/18/2024, 9:31 PMcache_policy=NONE
on the fetch_weather
task. Prefect was attempting to serialize the httpx.Client to cache the inputs for the task. Setting cache_policy=None
disables caching and result persistence for the task, so it should mitigate the ValueError
(which I believe you were seeing when you attempted to run the flow the first time).
import httpx
import os
from dataclasses import dataclass
from typing import List
from prefect import flow, task
from prefect.cache_policies import NONE
@dataclass
class Location:
lat: float
lon: float
@task(cache_policy=NONE)
def fetch_weather(client: httpx.Client, location: Location):
base_url = "<https://api.open-meteo.com/v1/forecast>"
response = client.get(
base_url,
params={
"latitude": location.lat,
"longitude": location.lon,
"hourly": "temperature_2m"
}
)
response.raise_for_status()
forecasted_temp = float(response.json()["hourly"]["temperature_2m"][0])
print(f"Forecasted temp C: {forecasted_temp} degrees for location {location.lat}, {location.lon}")
return location, forecasted_temp
@task
def save_weather(location: Location, temp: float):
filename = f"weather_{location.lat}_{location.lon}.csv"
with open(filename, "w+") as w:
w.write(str(temp))
return f"Successfully wrote temp for location {location.lat}, {location.lon}"
@flow(retries=3, log_prints=True)
def pipeline(locations: List[Location]):
api_key = os.environ.get('WEATHER_API_KEY')
if not api_key:
raise ValueError("WEATHER_API_KEY environment variable is not set")
with httpx.Client(
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
) as client:
for location in locations:
location, temp = fetch_weather(client, location)
result = save_weather(location, temp)
print(result)
return "Completed processing all locations"
if __name__ == "__main__":
locations = [
Location(lat=38.9, lon=-77.0), # Washington DC
Location(lat=40.7, lon=-74.0), # New York
Location(lat=34.0, lon=-118.2), # Los Angeles
]
pipeline(locations)