Hey all, I'm evaluating Prefect as an alternative ...
# ask-community
w
Hey all, I'm evaluating Prefect as an alternative to our orchestration (we're currently using AWS Step Functions which has a lot of challenges). So far everything looks pretty nice, but I'm noticing that when I try to use the
concurrency
context manager, I either: 1. Don't notice any real difference when I play around with the
occupy
parameter. Granted this is a simple 2. Sometimes my Flow will hang randomly for seemingly no reason. On one execution it'll execute fine to completion but on the next it'll just stop right before it gets to the context manager (i.e., no more logs). I'm noticing both whether I use the
sync
or
async
concurrency
context managers. I'll share a version of the code in the thread.
Copy code
from datetime import timedelta
from typing import Dict, List
import asyncio

import httpx
import duckdb
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.concurrency.asyncio import concurrency

API_KEY = <redacted>
DUCKDB_DATABASE = <redacted>

@task(
    name="Execute Weather API Call",
    retries=3,
    retry_delay_seconds=5,
    # cache_key_fn=task_input_hash,
    # cache_expiration=timedelta(days=1), # Weather data changes daily
    log_prints=True
)
async def execute_weather_api(zip_code: str) -> Dict[str, any]:
    url = "<https://api.openweathermap.org/data/2.5/weather?zip={zip_code},us&appid={api_key}>".format(zip_code=zip_code, api_key=API_KEY)

    print(f"Executing API call for {zip_code}")

    async with concurrency('execute-weather-api', occupy=1):
        response = httpx.get(url)
        try:
            response.raise_for_status()
        except httpx.HTTPError as e:
            return None

        return response.json()


@flow(
    name="Get Weather",
    log_prints=True
)
async def get_weather() -> None:
    print("Getting NY Weather")
    ny_weather = await get_ny_weather(get_ny_zip_codes(count=500))
    print("Materialize Weather")

    create_weather_table()
    write_data_to_duck_db(ny_weather + nj_weather)


@task(
    name="Write Data to DB",
    log_prints=True
)
def write_data_to_duck_db(weather_data: List[Dict[str, any]]) -> None:
    conn = duckdb.connect(DUCKDB_DATABASE)
    zip_codes = []

    for row in weather_data:
        query = f"""INSERT INTO weather
            VALUES (
                '{row["zip_code"]}', 
                '{row["lat"]}', 
                '{row["lng"]}', 
                '{row["temperature"]}', 
                '{row["feels_like"]}', 
                '{row["humidity"]}'
            )
        """
        conn.execute(query)
        zip_codes.append(row["zip_code"])

    print(f"Zip codes recorded: {zip_codes}")
    print(f"Number of records: {len(zip_codes)}")


@task(
    name="Get NY Zip Codes",
    log_prints=True
)
def get_ny_zip_codes(count: int) -> List[str]:
    url = "<https://data.ny.gov/resource/juva-r6g2.json>"
    response = httpx.get(url)

    try:
        response.raise_for_status()
    except httpx.HTTPError as e:
        raise

    return [d["zip_code"] for d in response.json()[:count]]


@flow(
    name="Get NY Weather",
    log_prints=True
)
async def get_ny_weather(zip_codes: List[str]) -> List[Dict[str, any]]:
    weather = []
    for zip_code in zip_codes:
        weather.append({
                "zip_code": zip_code,
                "result": await execute_weather_api.submit(zip_code)
            }
        )

    return_value = []
    for data in weather:
        w = await data["result"].result()
        
        if w:
            return_value.append({
                "zip_code": data["zip_code"],
                "lat": w["coord"]["lat"],
                "lng": w["coord"]["lon"],
                "temperature": w["main"]["temp"],
                "feels_like": w["main"]["feels_like"],
                "humidity": w["main"]["humidity"]
            })
    return return_value

@task(
    name="Create Weather Table",
    log_prints=True
)
def create_weather_table() -> None:
    sql = """CREATE TABLE IF NOT EXISTS weather (
        zip_code TEXT, 
        lat REAL, 
        lng REAL, 
        temperature REAL, 
        feels_like REAL, 
        humidity REAL
    )
    """
    duckdb.connect(DUCKDB_DATABASE).execute(sql)

if __name__ == "__main__":
    asyncio.run(get_weather())
On the first point, I would expect changing the
occupy
value from 1 to say, 100 would give me a noticeable speed-up in execution, but it doesn't.
n
hi @William Hom -
occupy
is just the number of slots that this context manager can occupy from the
limit
so what's the limit value you have set? you can do
prefect gcl ls
w
Hey @Nate! Sorry for not getting back to you and hope you enjoyed the weekend. I think what I missed was having to set up the concurrency in Prefect first, then using it in the context manager. Once I did that and activated them, it worked.
n
good to hear 👍