Chris Goddard
05/17/2023, 1:29 AMStéphan Taljaard
05/17/2023, 6:38 AMChris Goddard
05/17/2023, 6:42 AMStéphan Taljaard
05/17/2023, 6:42 AMChris Goddard
05/17/2023, 6:44 AMimport asyncio
import time
from faker import Faker
import typing
import json
from prefect import task, flow, get_run_logger
# from prefect.task_runners import SequentialTaskRunner
faker = Faker('en_US')
async def map_unordered(func, iterable, *, limit):
try:
aws = map(func, iterable)
except TypeError:
aws = (func(x) async for x in iterable)
async for task_ in limit_concurrency(aws, limit):
yield await task_
async def limit_concurrency(aws, limit):
try:
aws = aiter(aws)
is_async = True
except TypeError:
aws = iter(aws)
is_async = False
aws_ended = False
pending = set()
while pending or not aws_ended:
while len(pending) < limit and not aws_ended:
try:
aw = await anext(aws) if is_async else next(aws)
except StopAsyncIteration if is_async else StopIteration:
aws_ended = True
else:
pending.add(asyncio.ensure_future(aw))
if not pending:
return
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
while done:
yield done.pop()
class Response(typing.NamedTuple):
url: str
status_code: int
data: dict
async def fetch_url(url: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f'fetch url: {url}')
await asyncio.sleep(faker.random_int(1, 30)/10)
return Response(
url=url,
status_code=200,
data=faker.profile()
)
async def write_data(data: dict):
logger = get_run_logger()
id = hash(json.dumps(data, default=str))
<http://logger.info|logger.info>(f'writing data: {id}')
await asyncio.sleep(faker.random_int(1, 60)/10)
<http://logger.info|logger.info>(f'written data: {id}')
return id
async def get_urls(n: int):
logger = get_run_logger()
<http://logger.info|logger.info>(f'getting {n} urls')
urls = []
for i in range(n):
urls.append(faker.uri())
return urls
async def final(result):
logger = get_run_logger()
<http://logger.info|logger.info>(result)
@flow
async def main():
logger = get_run_logger()
start = time.monotonic()
urls = await get_urls(100)
data = (result async for result in map_unordered(fetch_url, urls, limit=10))
results = (result async for result in map_unordered(write_data, data, limit=10))
async for result in map_unordered(final, results, limit=100):
pass
<http://logger.info|logger.info>(f'finished in {time.monotonic() - start} seconds')
if __name__ == '__main__':
with asyncio.Runner() as runner:
runner.run(main())
That runs fine - takes about ~30-40 seconds
But as soon as I add a @task decorator to one of the functions - say, the fetch_url
function, it hangs between a quarter and three-quarters of the way throughChris Goddard
05/17/2023, 7:07 AMdata =
under def main() to:
data = [result.result async for result in map_unordered(fetch_url.submit, urls, limit=10)]
results = (result async for result in map_unordered(write_data, data, limit=10))
I appears to work. so there’s something about using the task.submit vs calling it directly?Chris Goddard
05/17/2023, 7:37 AM@task
to write_data
and final
@flow
async def main():
logger = get_run_logger()
start = time.monotonic()
urls = await get_urls(100)
data = [result async for result in map_unordered(fetch_url.submit, urls, limit=10)]
results = [result async for result in map_unordered(write_data.submit, data, limit=10)]
# await final.map(results)
async for result in map_unordered(final.submit, results, limit=5):
pass
<http://logger.info|logger.info>(f'finished in {time.monotonic() - start} seconds')
running with debug logging level, I see that I always get stuck at some point at the ’Beginning execution...` for some final
task - but no pattern of whichChris Goddard
05/17/2023, 7:38 AMimport asyncio
import time
from faker import Faker
import typing
import json
from prefect import task, flow, get_run_logger
from prefect.tasks import exponential_backoff
from prefect.futures import resolve_futures_to_data, resolve_futures_to_states
# from prefect.task_runners import SequentialTaskRunner
faker = Faker('en_US')
async def map_unordered(func, iterable, *, limit):
try:
aws = map(func, iterable)
except TypeError:
aws = (func(x) async for x in iterable)
async for task_ in limit_concurrency(aws, limit):
yield await task_
async def limit_concurrency(aws, limit):
try:
aws = aiter(aws)
is_async = True
except TypeError:
aws = iter(aws)
is_async = False
aws_ended = False
pending = set()
while pending or not aws_ended:
while len(pending) < limit and not aws_ended:
try:
aw = await anext(aws) if is_async else next(aws)
except StopAsyncIteration if is_async else StopIteration:
aws_ended = True
else:
pending.add(asyncio.ensure_future(aw))
if not pending:
return
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
while done:
yield done.pop()
class Response(typing.NamedTuple):
url: str
status_code: int
data: dict
@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=3))
async def fetch_url(url: str):
logger = get_run_logger()
# <http://logger.info|logger.info>(f'fetch url: {url}')
if faker.random_int(1, 30) == 1:
raise ValueError('random error')
await asyncio.sleep(faker.random_int(1, 30)/10)
return Response(
url=url,
status_code=200,
data=faker.profile()
)
@task
async def write_data(data: dict):
# logger = get_run_logger()
id = hash(json.dumps(data, default=str))
# <http://logger.info|logger.info>(f'writing data: {id}')
await asyncio.sleep(faker.random_int(1, 60)/10)
# <http://logger.info|logger.info>(f'written data: {id}')
return id
async def get_urls(n: int):
logger = get_run_logger()
# <http://logger.info|logger.info>(f'getting {n} urls')
urls = []
for i in range(n):
urls.append(faker.uri())
return urls
@task(timeout_seconds=10, retries=3)
async def final(result):
logger = get_run_logger()
<http://logger.info|logger.info>(f'final: {result}')
return True
@flow
async def main():
logger = get_run_logger()
start = time.monotonic()
urls = await get_urls(100)
data = [result async for result in map_unordered(fetch_url.submit, urls, limit=10)]
results = [result async for result in map_unordered(write_data.submit, data, limit=10)]
# await final.map(results)
async for result in map_unordered(final.submit, results, limit=5):
pass
<http://logger.info|logger.info>(f'finished in {time.monotonic() - start} seconds')
if __name__ == '__main__':
with asyncio.Runner(debug=True) as runner:
runner.run(main())
added some random errors to make fetch
more realisticChris Goddard
05/17/2023, 7:38 AMscott
05/17/2023, 4:24 PMscott
05/17/2023, 4:25 PMscott
05/17/2023, 4:25 PMBeginning execution ...
Chris Goddard
05/17/2023, 4:26 PMsubmit
method it’s spawning a worker thread for each task run - and it looks like one of the workers is left hanging at the end - maybe an uncaught exception within the worker?scott
05/17/2023, 4:28 PMChris Goddard
05/17/2023, 4:29 PMsubmit
but try to use plain async functions elsewhere - it’s super confusingChris Goddard
05/17/2023, 4:30 PMscott
05/17/2023, 4:31 PMscott
05/17/2023, 4:31 PMChris Goddard
05/17/2023, 4:32 PMChris Goddard
05/17/2023, 6:54 PMChris Goddard
05/17/2023, 7:05 PMscott
05/17/2023, 8:17 PM2.10.6
scott
05/17/2023, 8:17 PMZanie