Charles Marks
03/24/2024, 8:25 PMdef Session():
global _client
if _client is None:
_client=aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=100,limit_per_host=10,force_close=True,enable_cleanup_closed=True),timeout=aiohttp.ClientTimeout(total=30))
return _client
Nate
03/24/2024, 10:20 PMCharles Marks
03/24/2024, 10:28 PM#testing prefect
import aiohttp
import time
import asyncio
from prefect import flow, task, get_run_logger
def waitfor_reconnect(wait_sec=600):
#timeout_sec = Session().timeout.total
#print(timeout_sec)
def _wait(afunc):
async def _await(*args,**kwargs):
#logger = get_run_logger()
try:
return await afunc(*args,**kwargs)
except Exception as e:
ct = time.time()
dt = ct+wait_sec
session = Session()
cnt=1
while ct<=dt:
at=time.time()
try:
# Attempt to fetch the URL
async with await session.get("<http://www.google.com>") as response:
if response.status == 200:
#succss=True
print(f"Internet connection is live.")
break # Exit the loop if the connection is successful
else:
nt=time.time()
print(f"Internet ping failed attempt #{cnt}")
#logger.log(1,f"Internet ping failed attempt #{cnt}")
cnt+=1
except aiohttp.ClientError as e:
nt = time.time()
await asyncio.sleep(max(at-nt+30,0)) #+timeout_sec
ct =time.time()
#shouldn't matter if it succeeds or not because prefect should handle the retry.
raise e
return _await
return _wait
_client = None
def Session():
global _client
if _client is None:
_client=aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=100,limit_per_host=10,force_close=True,enable_cleanup_closed=True),timeout=aiohttp.ClientTimeout(total=30))
return _client
#testing flow
@task()
@waitfor_reconnect()
async def testing_task(say:str):
await asyncio.sleep(.5)
print(say)
await asyncio.sleep(.5)
#get_run_logger().log(0,say)
raise EnvironmentError()
@flow(retries=0)
async def testing_flow():
await testing_task('testing!!')
await testing_flow()
Charles Marks
03/24/2024, 10:33 PMCharles Marks
03/24/2024, 10:34 PMCharles Marks
03/24/2024, 10:38 PMNate
03/24/2024, 10:48 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by