Stéphan Taljaard
11/14/2022, 1:28 PMStéphan Taljaard
11/14/2022, 1:28 PMimport httpx
import pandas as pd
from prefect import task, flow, unmapped
base_url = "<http://localhost:3000>"
# TODO: Implement task run concurrency limit to protect the API
@task
async def perform_get_request(category_id: str, async_client: httpx.AsyncClient) -> httpx.Response:
category, id_ = category_id.split("_")
endpoint = f"/inventory/{category}?id={id_}"
url = base_url + endpoint
response = await async_client.get(url)
return response.json() # response.status_code handling would be nice
@flow
async def main_httpx(num_requests=50):
category_id = [f"{category}_{id_}" for category in ("materials", "products") for id_ in range(num_requests)]
client = httpx.AsyncClient()
request_futures = await perform_get_request.map(category_id, unmapped(client))
request_states = [await f.wait() for f in request_futures] # Wait for all requests to finish before closing
await client.aclose()
dfs = [await f.result() for f in request_futures]
return pd.DataFrame(dfs)
if __name__ == "__main__":
import asyncio
print(asyncio.run(main_httpx()))
Stéphan Taljaard
11/14/2022, 1:29 PM{
"id": "{{queryParam 'id'}}",
{{# switch (urlParam 'type')}}
{{# case 'products'}}
"name": "{{faker 'commerce.product'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{# case 'materials'}}
"name": "{{faker 'commerce.productMaterial'}}",
"price": "{{faker 'commerce.price'}} EUR",
"quantity": "{{faker 'datatype.number' min=0 max=50}}"
{{/ case}}
{{/ switch}}
}
Ryan Peden
11/14/2022, 3:13 PM.wait
and .result
wait until the future completes, so if you don't need to use request_states
, you might find this approach useful instead:
from prefect.utilities.asyncutils import gather
...task and beginning of flow code...
dfs = await gather(*[rf.result for rf in request_futures])
await client.aclose()
return pd.DataFrame(dfs)
Nate
11/14/2022, 3:18 PMhttpx.AsyncClient
can be used as a context manager to avoid worrying about closing it yourself, e.g.
async with httpx.AsyncClient() as client:
request_futures = await perform_get_request.map(category_id, unmapped(client))
dfs = await gather(*[rf.result for rf in request_futures])
return pd.DataFrame(dfs)
Ryan Peden
11/14/2022, 3:53 PMrf.result
instead of rf.result()
though, because Prefect's gather
is looking for callables instead of awaitables.Nate
11/14/2022, 3:54 PMStéphan Taljaard
11/14/2022, 4:01 PMgather
seems to be what I was missing!
I had the context manager at first, but the tasks complained about the client already begin stopped. Gather solves that!Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by