Stéphan Taljaard
11/14/2022, 1:28 PMimport httpx
import pandas as pd
from prefect import task, flow, unmapped
base_url = "<http://localhost:3000>"
# TODO: Implement task run concurrency limit to protect the API
@task
async def perform_get_request(category_id: str, async_client: httpx.AsyncClient) -> httpx.Response:
category, id_ = category_id.split("_")
endpoint = f"/inventory/{category}?id={id_}"
url = base_url + endpoint
response = await async_client.get(url)
return response.json() # response.status_code handling would be nice
@flow
async def main_httpx(num_requests=50):
category_id = [f"{category}_{id_}" for category in ("materials", "products") for id_ in range(num_requests)]
client = httpx.AsyncClient()
request_futures = await perform_get_request.map(category_id, unmapped(client))
request_states = [await f.wait() for f in request_futures] # Wait for all requests to finish before closing
await client.aclose()
dfs = [await f.result() for f in request_futures]
return pd.DataFrame(dfs)
if __name__ == "__main__":
import asyncio
print(asyncio.run(main_httpx()))
{
"id": "{{queryParam 'id'}}",
{{# switch (urlParam 'type')}}
{{# case 'products'}}
"name": "{{faker 'commerce.product'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{# case 'materials'}}
"name": "{{faker 'commerce.productMaterial'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{/ switch}}
}
Ryan Peden
11/14/2022, 3:13 PM.wait
and .result
wait until the future completes, so if you don't need to use request_states
, you might find this approach useful instead:
from prefect.utilities.asyncutils import gather
...task and beginning of flow code...
dfs = await gather(*[rf.result for rf in request_futures])
await client.aclose()
return pd.DataFrame(dfs)
Nate
11/14/2022, 3:18 PMhttpx.AsyncClient
can be used as a context manager to avoid worrying about closing it yourself, e.g.
async with httpx.AsyncClient() as client:
request_futures = await perform_get_request.map(category_id, unmapped(client))
dfs = await gather(*[rf.result for rf in request_futures])
return pd.DataFrame(dfs)
Ryan Peden
11/14/2022, 3:53 PMrf.result
instead of rf.result()
though, because Prefect's gather
is looking for callables instead of awaitables.Nate
11/14/2022, 3:54 PMStéphan Taljaard
11/14/2022, 4:01 PMgather
seems to be what I was missing!
I had the context manager at first, but the tasks complained about the client already begin stopped. Gather solves that!