https://prefect.io logo
#prefect-community
Title
# prefect-community
v

Volker L

06/12/2022, 11:06 AM
Hello, I have an async function, which fetches some data from an REST API (using aiohttp). The task returns a pandas DataFrame. A second function saves the pandas DataFrame as a parquet file into a s3 bucket. Running this functions in python is really fast (<1s). However, if I wrap both functions into prefect tasks and place them into a prefect flow, the flow execution takes up to 5 minutes. I do not understand, why the pure python solution os so much fast, compared to when using prefect. I have posted some sample code inside of the thread.
1
a

Anna Geller

06/12/2022, 11:17 AM
Could you move code blocks to the thread?
v

Volker L

06/12/2022, 11:19 AM
Sorry, I do not understand what you want me to do. Which code blocks and which thread?
I just figured out, that it runs quite fine, when I use the
SequentialTaskRunner
.
a

Anna Geller

06/12/2022, 2:38 PM
Thanks for update I meant the same as in thread above you - can you move code blocks into the thread instead of main message?
1
v

Volker L

06/13/2022, 5:35 AM
Copy code
import pyarrow as pa
import pyarrow.parquet as pq
from utils import fetch_async

async def save(
    table: pa.Table | pd.DataFrame,
    path: str,
 
):
    if isinstance(table, pd.DataFrame):
        table = pa.Table.from_pandas(table, preserve_index=False)
    pq.write_table(table=table, where=path)



async def fetch_exchanges():
    headers = {"user-agent": "my_user_agent"}
    url = "<https://my_url_to_fetch>"
    # custom async function for fetching data from a REST API. Returns a pandas dataframe.
    df = await fetch_async(url, params=None, headers=headers) 
    
   return df



async def run(path:str):
    df = await fetch_exchanges()
    await save(df, "test.parquet")

if __name__=='__main__':
    asyncio.run(run("test.parquet"))
Copy code
import pyarrow as pa
import pyarrow.parquet as pq
from utils import fetch_async

@task
async def save(
    table: pa.Table | pd.DataFrame,
    path: str,
 
):
    if isinstance(table, pd.DataFrame):
        table = pa.Table.from_pandas(table, preserve_index=False)
    pq.write_table(table=table, where=path)


@task
async def fetch_exchanges():
    headers = {"user-agent": "my_user_agent"}
    url = "<https://my_url_to_fetch>"
    # custom async function for fetching data from a REST API. Returns a pandas dataframe.
    df = await fetch_async(url, params=None, headers=headers) 
    
   return df


@flow
async def run(path:str):
    df = await fetch_exchanges()
    await save(df, "test.parquet")

if __name__=='__main__':
    asyncio.run(run("test.parquet"))
a

Anna Geller

06/13/2022, 11:11 AM
thank you!
7 Views