Hi I'm working on a barebones example of mapped as...
# prefect-community
Hi I'm working on a barebones example of mapped async HTTP GET requests Would you mind having a look at my code in the thread - any recommendations? I trust the futures.wait() is the correct way to force all mapped task runs to finish before closing the AsyncClient?
Copy code
import httpx
import pandas as pd
from prefect import task, flow, unmapped

base_url = "<http://localhost:3000>"

# TODO: Implement task run concurrency limit to protect the API
async def perform_get_request(category_id: str, async_client: httpx.AsyncClient) -> httpx.Response:
    category, id_ = category_id.split("_")
    endpoint = f"/inventory/{category}?id={id_}"
    url = base_url + endpoint
    response = await async_client.get(url)
    return response.json()  # response.status_code handling would be nice

async def main_httpx(num_requests=50):
    category_id = [f"{category}_{id_}" for category in ("materials", "products") for id_ in range(num_requests)]

    client = httpx.AsyncClient()
    request_futures = await perform_get_request.map(category_id, unmapped(client))
    request_states = [await f.wait() for f in request_futures]  # Wait for all requests to finish before closing
    await client.aclose()

    dfs = [await f.result() for f in request_futures]
    return pd.DataFrame(dfs)

if __name__ == "__main__":
    import asyncio
I used this Mockoon setup
Copy code
"id": "{{queryParam 'id'}}",
{{# switch (urlParam 'type')}}
{{# case 'products'}}
"name": "{{faker 'commerce.product'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{# case 'materials'}}
"name": "{{faker 'commerce.productMaterial'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{/ switch}}
wait until the future completes, so if you don't need to use
, you might find this approach useful instead:
Copy code
from prefect.utilities.asyncutils import gather

...task and beginning of flow code...

   dfs = await gather(*[rf.result for rf in request_futures])
   await client.aclose()
   return pd.DataFrame(dfs)
❤️ 1
as a note,
can be used as a context manager to avoid worrying about closing it yourself, e.g.
Copy code
async with httpx.AsyncClient() as client:
    request_futures = await perform_get_request.map(category_id, unmapped(client))

    dfs = await gather(*[rf.result for rf in request_futures])
    return pd.DataFrame(dfs)
👍 3
❤️ 1
Good thinking, Nate! I think it needs to be
instead of
though, because Prefect's
is looking for callables instead of awaitables.
ooops yep nice catch
Hi. Thanks!
seems to be what I was missing! I had the context manager at first, but the tasks complained about the client already begin stopped. Gather solves that!