v

    Volker L

    3 months ago
    Hello, I have an async function, which fetches some data from an REST API (using aiohttp). The task returns a pandas DataFrame. A second function saves the pandas DataFrame as a parquet file into a s3 bucket. Running this functions in python is really fast (<1s). However, if I wrap both functions into prefect tasks and place them into a prefect flow, the flow execution takes up to 5 minutes. I do not understand, why the pure python solution os so much fast, compared to when using prefect. I have posted some sample code inside of the thread.
    Anna Geller

    Anna Geller

    3 months ago
    Could you move code blocks to the thread?
    v

    Volker L

    3 months ago
    Sorry, I do not understand what you want me to do. Which code blocks and which thread?
    I just figured out, that it runs quite fine, when I use the
    SequentialTaskRunner
    .
    Anna Geller

    Anna Geller

    3 months ago
    Thanks for update I meant the same as in thread above you - can you move code blocks into the thread instead of main message?
    v

    Volker L

    3 months ago
    import pyarrow as pa
    import pyarrow.parquet as pq
    from utils import fetch_async
    
    async def save(
        table: pa.Table | pd.DataFrame,
        path: str,
     
    ):
        if isinstance(table, pd.DataFrame):
            table = pa.Table.from_pandas(table, preserve_index=False)
        pq.write_table(table=table, where=path)
    
    
    
    async def fetch_exchanges():
        headers = {"user-agent": "my_user_agent"}
        url = "<https://my_url_to_fetch>"
        # custom async function for fetching data from a REST API. Returns a pandas dataframe.
        df = await fetch_async(url, params=None, headers=headers) 
        
       return df
    
    
    
    async def run(path:str):
        df = await fetch_exchanges()
        await save(df, "test.parquet")
    
    if __name__=='__main__':
        asyncio.run(run("test.parquet"))
    import pyarrow as pa
    import pyarrow.parquet as pq
    from utils import fetch_async
    
    @task
    async def save(
        table: pa.Table | pd.DataFrame,
        path: str,
     
    ):
        if isinstance(table, pd.DataFrame):
            table = pa.Table.from_pandas(table, preserve_index=False)
        pq.write_table(table=table, where=path)
    
    
    @task
    async def fetch_exchanges():
        headers = {"user-agent": "my_user_agent"}
        url = "<https://my_url_to_fetch>"
        # custom async function for fetching data from a REST API. Returns a pandas dataframe.
        df = await fetch_async(url, params=None, headers=headers) 
        
       return df
    
    
    @flow
    async def run(path:str):
        df = await fetch_exchanges()
        await save(df, "test.parquet")
    
    if __name__=='__main__':
        asyncio.run(run("test.parquet"))
    Anna Geller

    Anna Geller

    3 months ago
    thank you!