William Hom
08/30/2024, 8:10 PMconcurrency
context manager, I either:
1. Don't notice any real difference when I play around with the occupy
parameter. Granted this is a simple
2. Sometimes my Flow will hang randomly for seemingly no reason. On one execution it'll execute fine to completion but on the next it'll just stop right before it gets to the context manager (i.e., no more logs).
I'm noticing both whether I use the sync
or async
concurrency
context managers.
I'll share a version of the code in the thread.William Hom
08/30/2024, 8:10 PMfrom datetime import timedelta
from typing import Dict, List
import asyncio
import httpx
import duckdb
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.concurrency.asyncio import concurrency
API_KEY = <redacted>
DUCKDB_DATABASE = <redacted>
@task(
name="Execute Weather API Call",
retries=3,
retry_delay_seconds=5,
# cache_key_fn=task_input_hash,
# cache_expiration=timedelta(days=1), # Weather data changes daily
log_prints=True
)
async def execute_weather_api(zip_code: str) -> Dict[str, any]:
url = "<https://api.openweathermap.org/data/2.5/weather?zip={zip_code},us&appid={api_key}>".format(zip_code=zip_code, api_key=API_KEY)
print(f"Executing API call for {zip_code}")
async with concurrency('execute-weather-api', occupy=1):
response = httpx.get(url)
try:
response.raise_for_status()
except httpx.HTTPError as e:
return None
return response.json()
@flow(
name="Get Weather",
log_prints=True
)
async def get_weather() -> None:
print("Getting NY Weather")
ny_weather = await get_ny_weather(get_ny_zip_codes(count=500))
print("Materialize Weather")
create_weather_table()
write_data_to_duck_db(ny_weather + nj_weather)
@task(
name="Write Data to DB",
log_prints=True
)
def write_data_to_duck_db(weather_data: List[Dict[str, any]]) -> None:
conn = duckdb.connect(DUCKDB_DATABASE)
zip_codes = []
for row in weather_data:
query = f"""INSERT INTO weather
VALUES (
'{row["zip_code"]}',
'{row["lat"]}',
'{row["lng"]}',
'{row["temperature"]}',
'{row["feels_like"]}',
'{row["humidity"]}'
)
"""
conn.execute(query)
zip_codes.append(row["zip_code"])
print(f"Zip codes recorded: {zip_codes}")
print(f"Number of records: {len(zip_codes)}")
@task(
name="Get NY Zip Codes",
log_prints=True
)
def get_ny_zip_codes(count: int) -> List[str]:
url = "<https://data.ny.gov/resource/juva-r6g2.json>"
response = httpx.get(url)
try:
response.raise_for_status()
except httpx.HTTPError as e:
raise
return [d["zip_code"] for d in response.json()[:count]]
@flow(
name="Get NY Weather",
log_prints=True
)
async def get_ny_weather(zip_codes: List[str]) -> List[Dict[str, any]]:
weather = []
for zip_code in zip_codes:
weather.append({
"zip_code": zip_code,
"result": await execute_weather_api.submit(zip_code)
}
)
return_value = []
for data in weather:
w = await data["result"].result()
if w:
return_value.append({
"zip_code": data["zip_code"],
"lat": w["coord"]["lat"],
"lng": w["coord"]["lon"],
"temperature": w["main"]["temp"],
"feels_like": w["main"]["feels_like"],
"humidity": w["main"]["humidity"]
})
return return_value
@task(
name="Create Weather Table",
log_prints=True
)
def create_weather_table() -> None:
sql = """CREATE TABLE IF NOT EXISTS weather (
zip_code TEXT,
lat REAL,
lng REAL,
temperature REAL,
feels_like REAL,
humidity REAL
)
"""
duckdb.connect(DUCKDB_DATABASE).execute(sql)
if __name__ == "__main__":
asyncio.run(get_weather())
William Hom
08/30/2024, 8:11 PMoccupy
value from 1 to say, 100 would give me a noticeable speed-up in execution, but it doesn't.Nate
08/30/2024, 9:11 PMoccupy
is just the number of slots that this context manager can occupy from the limit
so what's the limit value you have set? you can do prefect gcl ls
William Hom
09/03/2024, 7:00 PMNate
09/03/2024, 7:03 PM