https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Stéphan Taljaard

11/14/2022, 1:28 PM
Hi I'm working on a barebones example of mapped async HTTP GET requests Would you mind having a look at my code in the thread - any recommendations? I trust the futures.wait() is the correct way to force all mapped task runs to finish before closing the AsyncClient?
2
Copy code
import httpx
import pandas as pd
from prefect import task, flow, unmapped

base_url = "<http://localhost:3000>"

# TODO: Implement task run concurrency limit to protect the API
@task
async def perform_get_request(category_id: str, async_client: httpx.AsyncClient) -> httpx.Response:
    category, id_ = category_id.split("_")
    endpoint = f"/inventory/{category}?id={id_}"
    url = base_url + endpoint
    response = await async_client.get(url)
    return response.json()  # response.status_code handling would be nice


@flow
async def main_httpx(num_requests=50):
    category_id = [f"{category}_{id_}" for category in ("materials", "products") for id_ in range(num_requests)]

    client = httpx.AsyncClient()
    request_futures = await perform_get_request.map(category_id, unmapped(client))
    request_states = [await f.wait() for f in request_futures]  # Wait for all requests to finish before closing
    await client.aclose()

    dfs = [await f.result() for f in request_futures]
    return pd.DataFrame(dfs)


if __name__ == "__main__":
    import asyncio
    print(asyncio.run(main_httpx()))
I used this Mockoon setup
Copy code
{
"id": "{{queryParam 'id'}}",
{{# switch (urlParam 'type')}}
{{# case 'products'}}
"name": "{{faker 'commerce.product'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{# case 'materials'}}
"name": "{{faker 'commerce.productMaterial'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{/ switch}}
}
r

Ryan Peden

11/14/2022, 3:13 PM
Both
.wait
and
.result
wait until the future completes, so if you don't need to use
request_states
, you might find this approach useful instead:
Copy code
from prefect.utilities.asyncutils import gather

...task and beginning of flow code...

   dfs = await gather(*[rf.result for rf in request_futures])
   await client.aclose()
   return pd.DataFrame(dfs)
❤️ 1
n

Nate

11/14/2022, 3:18 PM
as a note,
httpx.AsyncClient
can be used as a context manager to avoid worrying about closing it yourself, e.g.
Copy code
async with httpx.AsyncClient() as client:
    request_futures = await perform_get_request.map(category_id, unmapped(client))

    dfs = await gather(*[rf.result for rf in request_futures])
    return pd.DataFrame(dfs)
👍 3
❤️ 1
r

Ryan Peden

11/14/2022, 3:53 PM
Good thinking, Nate! I think it needs to be
rf.result
instead of
rf.result()
though, because Prefect's
gather
is looking for callables instead of awaitables.
n

Nate

11/14/2022, 3:54 PM
ooops yep nice catch
s

Stéphan Taljaard

11/14/2022, 4:01 PM
Hi. Thanks!
gather
seems to be what I was missing! I had the context manager at first, but the tasks complained about the client already begin stopped. Gather solves that!
3 Views