Hello. I'm trying to pass an aiohttp client session to a flow/task. Following an error in the functi...
c
Hello. I'm trying to pass an aiohttp client session to a flow/task. Following an error in the functional block of a task it will ping/wait for an internet connection and then raise the error for the flow or task decorator to handle. I noticed that the session can't be reused seemingly because the event loop changes for each flow or task run? Hard for me to copy paste the stack from my jupyter notebook because of how it prints, it's Runtimeerror: Event loop is closed. The first time _client is assigned in Session() it works because the context is probably in the current event loop.
Copy code
def Session():
    global _client
    if _client is None:
        _client=aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=100,limit_per_host=10,force_close=True,enable_cleanup_closed=True),timeout=aiohttp.ClientTimeout(total=30))
    return _client
n
hi @Charles Marks - can you show an example of what you're trying as it relates to prefect? I'm not sure what the above is demonstrating generally speaking subflows are run on the main thread, tasks are run in worker threads
c
Copy code
#testing prefect
import aiohttp
import time
import asyncio
from prefect import flow, task, get_run_logger

def waitfor_reconnect(wait_sec=600):
    #timeout_sec = Session().timeout.total
    #print(timeout_sec)
    def _wait(afunc):
        async def _await(*args,**kwargs):
            #logger = get_run_logger()
            try:
                return await afunc(*args,**kwargs)
            except Exception as e:
                ct = time.time() 
                dt = ct+wait_sec
                session = Session()
                cnt=1
                while ct<=dt:
                    at=time.time()
                    try:
                        # Attempt to fetch the URL
                        async with await session.get("<http://www.google.com>") as response:
                            if response.status == 200:
                                #succss=True
                                print(f"Internet connection is live.")
                                break  # Exit the loop if the connection is successful
                            else:
                                nt=time.time()
                                print(f"Internet ping failed attempt #{cnt}")
                                #logger.log(1,f"Internet ping failed attempt #{cnt}")
                                cnt+=1
                    except aiohttp.ClientError as e:
                        nt = time.time()

                    await asyncio.sleep(max(at-nt+30,0)) #+timeout_sec
                    ct =time.time()
                #shouldn't matter if it succeeds or not because prefect should handle the retry.
                raise e
        return _await
    return _wait


_client = None

def Session():
    global _client
    if _client is None:
        _client=aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=100,limit_per_host=10,force_close=True,enable_cleanup_closed=True),timeout=aiohttp.ClientTimeout(total=30))
    return _client

#testing flow

@task()
@waitfor_reconnect()
async def testing_task(say:str):
    await asyncio.sleep(.5)
    print(say)
    await asyncio.sleep(.5)
    #get_run_logger().log(0,say)
    raise EnvironmentError()

@flow(retries=0)    
async def testing_flow():
    await testing_task('testing!!')

await testing_flow()
That makes sense then. Is it because the task and flow code is only implemented with synchronous methods so you are left with sending async functions to run in separate threads? Can tasks and flows inherently handle the async event loop somehow? On one thread.
@Nate
Oh ok, I overcomplicated. I changed the @ task to @ flow and it worked.
🙌 1
n
good to hear! > task and flow code is only implemented with synchronous methods sorry if I'm not understanding the question, but tasks or flows can be implemented as sync or async functions, you just generally need to follow the rules of async python. We run tasks in worker threads behind the scenes for performance reasons, which can cause problems if you instantiate something (like a db connection object) one in thread, pass it into a task, and then try to use it in the task (in the worker thread). by "something" I mean something that needs to be "used" in the same thread it was created in if you're curious about the details, feel free to get into the weeds here 🙂
🙌 1