<@U07QHB8T7HT> feel free to post your q here!
# pacc-dec-17-18-2024
b
@David Michael Gang feel free to post your q here!
d
This is an artifical example but should make it clear
Copy code
import httpx
import os
from dataclasses import dataclass
from typing import List
from prefect import flow, task

@dataclass
class Location:
    lat: float
    lon: float

# Get API key from environment variables
api_key = os.environ.get('WEATHER_API_KEY')
if not api_key:
    raise ValueError("WEATHER_API_KEY environment variable is not set")

client = httpx.Client(
    headers={
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
)

@task
def fetch_weather(location: Location):
    base_url = "<https://api.open-meteo.com/v1/forecast>"
    response = client.get(
        base_url,
        params={
            "latitude": location.lat,
            "longitude": location.lon,
            "hourly": "temperature_2m"
        }
    )
    response.raise_for_status()
    forecasted_temp = float(response.json()["hourly"]["temperature_2m"][0])
    print(f"Forecasted temp C: {forecasted_temp} degrees for location {location.lat}, {location.lon}")
    return location, forecasted_temp

@task
def save_weather(location: Location, temp: float):
    filename = f"weather_{location.lat}_{location.lon}.csv"
    with open(filename, "w+") as w:
        w.write(str(temp))
    return f"Successfully wrote temp for location {location.lat}, {location.lon}"

@flow(retries=3, log_prints=True)
def pipeline(locations: List[Location]):
    for location in locations:
        location, temp = fetch_weather(location)
        result = save_weather(location, temp)
        print(result)
    return "Completed processing all locations"

if __name__ == "__main__":
    locations = [
        Location(lat=38.9, lon=-77.0),  # Washington DC
        Location(lat=40.7, lon=-74.0),  # New York
        Location(lat=34.0, lon=-118.2),  # Los Angeles
    ]
    
    try:
        pipeline(locations)
    finally:
        client.close()
This works but in the deploy stage i need to have some value of WEATHER_API_KEY This is more natural but doesn't work
Copy code
import httpx
import os
from dataclasses import dataclass
from typing import List
from prefect import flow, task

@dataclass
class Location:
    lat: float
    lon: float

@task
def fetch_weather(client: httpx.Client, location: Location):
    base_url = "<https://api.open-meteo.com/v1/forecast>"
    response = client.get(
        base_url,
        params={
            "latitude": location.lat,
            "longitude": location.lon,
            "hourly": "temperature_2m"
        }
    )
    response.raise_for_status()
    forecasted_temp = float(response.json()["hourly"]["temperature_2m"][0])
    print(f"Forecasted temp C: {forecasted_temp} degrees for location {location.lat}, {location.lon}")
    return location, forecasted_temp

@task
def save_weather(location: Location, temp: float):
    filename = f"weather_{location.lat}_{location.lon}.csv"
    with open(filename, "w+") as w:
        w.write(str(temp))
    return f"Successfully wrote temp for location {location.lat}, {location.lon}"

@flow(retries=3, log_prints=True)
def pipeline(locations: List[Location]):
    api_key = os.environ.get('WEATHER_API_KEY')
    if not api_key:
        raise ValueError("WEATHER_API_KEY environment variable is not set")
        
    with httpx.Client(
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    ) as client:
        for location in locations:
            location, temp = fetch_weather(client, location)
            result = save_weather(location, temp)
            print(result)
    
    return "Completed processing all locations"

if __name__ == "__main__":
    locations = [
        Location(lat=38.9, lon=-77.0),  # Washington DC
        Location(lat=40.7, lon=-74.0),  # New York
        Location(lat=34.0, lon=-118.2),  # Los Angeles
    ]
    
    pipeline(locations)
@Bianca Hoch do you have an update here? Thanks
b
Hi David! Ah sorry, thank you for the reminder. Would you be able to remind me of what your question from the Zoom call was/what pattern you'd like to implement?
d
i just want to know how i create a shared object for the flow. in other frameworks this would be done either by functionpassing or DI
this example is for http client. i don't want to create it in the task as then i would not enjoy the connection pool
my use case is that i have a flow with tens of thousands task, so that adds up especially the http connection creation so i want to share the connection pool
👍 1
b
I was able to get your second example to work by setting
cache_policy=NONE
on the
fetch_weather
task. Prefect was attempting to serialize the httpx.Client to cache the inputs for the task. Setting
cache_policy=None
disables caching and result persistence for the task, so it should mitigate the
ValueError
(which I believe you were seeing when you attempted to run the flow the first time).
Copy code
import httpx
import os
from dataclasses import dataclass
from typing import List
from prefect import flow, task
from prefect.cache_policies import NONE

@dataclass
class Location:
    lat: float
    lon: float

@task(cache_policy=NONE)
def fetch_weather(client: httpx.Client, location: Location):
    base_url = "<https://api.open-meteo.com/v1/forecast>"
    response = client.get(
        base_url,
        params={
            "latitude": location.lat,
            "longitude": location.lon,
            "hourly": "temperature_2m"
        }
    )
    response.raise_for_status()
    forecasted_temp = float(response.json()["hourly"]["temperature_2m"][0])
    print(f"Forecasted temp C: {forecasted_temp} degrees for location {location.lat}, {location.lon}")
    return location, forecasted_temp

@task
def save_weather(location: Location, temp: float):
    filename = f"weather_{location.lat}_{location.lon}.csv"
    with open(filename, "w+") as w:
        w.write(str(temp))
    return f"Successfully wrote temp for location {location.lat}, {location.lon}"

@flow(retries=3, log_prints=True)
def pipeline(locations: List[Location]):
    api_key = os.environ.get('WEATHER_API_KEY')
    if not api_key:
        raise ValueError("WEATHER_API_KEY environment variable is not set")
        
    with httpx.Client(
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    ) as client:
        for location in locations:
            location, temp = fetch_weather(client, location)
            result = save_weather(location, temp)
            print(result)
    
    return "Completed processing all locations"

if __name__ == "__main__":
    locations = [
        Location(lat=38.9, lon=-77.0),  # Washington DC
        Location(lat=40.7, lon=-74.0),  # New York
        Location(lat=34.0, lon=-118.2),  # Los Angeles
    ]
    
    pipeline(locations)