Volker L
06/12/2022, 11:06 AMAnna Geller
Volker L
06/12/2022, 11:19 AMSequentialTaskRunner
.Anna Geller
Volker L
06/13/2022, 5:35 AMimport pyarrow as pa
import pyarrow.parquet as pq
from utils import fetch_async
async def save(
table: pa.Table | pd.DataFrame,
path: str,
):
if isinstance(table, pd.DataFrame):
table = pa.Table.from_pandas(table, preserve_index=False)
pq.write_table(table=table, where=path)
async def fetch_exchanges():
headers = {"user-agent": "my_user_agent"}
url = "<https://my_url_to_fetch>"
# custom async function for fetching data from a REST API. Returns a pandas dataframe.
df = await fetch_async(url, params=None, headers=headers)
return df
async def run(path:str):
df = await fetch_exchanges()
await save(df, "test.parquet")
if __name__=='__main__':
asyncio.run(run("test.parquet"))
import pyarrow as pa
import pyarrow.parquet as pq
from utils import fetch_async
@task
async def save(
table: pa.Table | pd.DataFrame,
path: str,
):
if isinstance(table, pd.DataFrame):
table = pa.Table.from_pandas(table, preserve_index=False)
pq.write_table(table=table, where=path)
@task
async def fetch_exchanges():
headers = {"user-agent": "my_user_agent"}
url = "<https://my_url_to_fetch>"
# custom async function for fetching data from a REST API. Returns a pandas dataframe.
df = await fetch_async(url, params=None, headers=headers)
return df
@flow
async def run(path:str):
df = await fetch_exchanges()
await save(df, "test.parquet")
if __name__=='__main__':
asyncio.run(run("test.parquet"))
Anna Geller