has anyone experience async flows hanging for no a...
# ask-community
c
has anyone experience async flows hanging for no apparent reason? I’m doing a flow/sub-flow structure where the flows can spawn other flows (as sub-flows) and for some reason at some point in execution everything just stops - no more logs sent or anything. I feel like I’m missing something obvious with how async flows/tasks work within prefect but curious if anyone has encountered this before?
s
Are you using tags + task run concurrency limits?
c
no - no concurrency limits
s
hmm
c
Here’s a toy example I just wrote up:
Copy code
import asyncio
import time
from faker import Faker
import typing
import json
from prefect import task, flow, get_run_logger
# from prefect.task_runners import SequentialTaskRunner

faker = Faker('en_US')


async def map_unordered(func, iterable, *, limit):
    try:
        aws = map(func, iterable)
    except TypeError:
        aws = (func(x) async for x in iterable)

    async for task_ in limit_concurrency(aws, limit):
        yield await task_


async def limit_concurrency(aws, limit):
    try:
        aws = aiter(aws)
        is_async = True
    except TypeError:
        aws = iter(aws)
        is_async = False

    aws_ended = False
    pending = set()

    while pending or not aws_ended:
        while len(pending) < limit and not aws_ended:
            try:
                aw = await anext(aws) if is_async else next(aws)
            except StopAsyncIteration if is_async else StopIteration:
                aws_ended = True
            else:
                pending.add(asyncio.ensure_future(aw))

        if not pending:
            return

        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        while done:
            yield done.pop()


class Response(typing.NamedTuple):
    url: str
    status_code: int
    data: dict


async def fetch_url(url: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f'fetch url: {url}')
    await asyncio.sleep(faker.random_int(1, 30)/10)
    return Response(
        url=url,
        status_code=200,
        data=faker.profile()
    )


async def write_data(data: dict):
    logger = get_run_logger()
    id = hash(json.dumps(data, default=str))
    <http://logger.info|logger.info>(f'writing data: {id}')
    await asyncio.sleep(faker.random_int(1, 60)/10)
    <http://logger.info|logger.info>(f'written data: {id}')
    return id


async def get_urls(n: int):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f'getting {n} urls')
    urls = []
    for i in range(n):
        urls.append(faker.uri())
    return urls


async def final(result):
    logger = get_run_logger()
    <http://logger.info|logger.info>(result)


@flow
async def main():
    logger = get_run_logger()
    start = time.monotonic()
    urls = await get_urls(100)
    data = (result async for result in map_unordered(fetch_url, urls, limit=10))
    results = (result async for result in map_unordered(write_data, data, limit=10))
    async for result in map_unordered(final, results, limit=100):
        pass
    <http://logger.info|logger.info>(f'finished in {time.monotonic() - start} seconds')

if __name__ == '__main__':
    with asyncio.Runner() as runner:
        runner.run(main())
That runs fine - takes about ~30-40 seconds But as soon as I add a @task decorator to one of the functions - say, the
fetch_url
function, it hangs between a quarter and three-quarters of the way through
so, some progress - this seems to work - but I don’t know why. If I replace the lines from
data =
under def main() to:
Copy code
data = [result.result async for result in map_unordered(fetch_url.submit, urls, limit=10)]
    results = (result async for result in map_unordered(write_data, data, limit=10))
I appears to work. so there’s something about using the task.submit vs calling it directly?
when I try to update more functions to tasks though I invariably run into the same issue. e.g. when I add
@task
to
write_data
and
final
Copy code
@flow
async def main():
    logger = get_run_logger()
    start = time.monotonic()
    urls = await get_urls(100)
    data = [result async for result in map_unordered(fetch_url.submit, urls, limit=10)]
    results = [result async for result in map_unordered(write_data.submit, data, limit=10)]
    # await final.map(results)
    async for result in map_unordered(final.submit, results, limit=5):
        pass
    <http://logger.info|logger.info>(f'finished in {time.monotonic() - start} seconds')
running with debug logging level, I see that I always get stuck at some point at the ’Beginning execution...` for some
final
task - but no pattern of which
see full example:
Copy code
import asyncio
import time
from faker import Faker
import typing
import json
from prefect import task, flow, get_run_logger
from prefect.tasks import exponential_backoff
from prefect.futures import resolve_futures_to_data, resolve_futures_to_states

# from prefect.task_runners import SequentialTaskRunner

faker = Faker('en_US')


async def map_unordered(func, iterable, *, limit):
    try:
        aws = map(func, iterable)
    except TypeError:
        aws = (func(x) async for x in iterable)

    async for task_ in limit_concurrency(aws, limit):
        yield await task_


async def limit_concurrency(aws, limit):
    try:
        aws = aiter(aws)
        is_async = True
    except TypeError:
        aws = iter(aws)
        is_async = False

    aws_ended = False
    pending = set()

    while pending or not aws_ended:
        while len(pending) < limit and not aws_ended:
            try:
                aw = await anext(aws) if is_async else next(aws)
            except StopAsyncIteration if is_async else StopIteration:
                aws_ended = True
            else:
                pending.add(asyncio.ensure_future(aw))

        if not pending:
            return

        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        while done:
            yield done.pop()


class Response(typing.NamedTuple):
    url: str
    status_code: int
    data: dict


@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=3))
async def fetch_url(url: str):
    logger = get_run_logger()
    # <http://logger.info|logger.info>(f'fetch url: {url}')

    if faker.random_int(1, 30) == 1:
        raise ValueError('random error')

    await asyncio.sleep(faker.random_int(1, 30)/10)
    return Response(
        url=url,
        status_code=200,
        data=faker.profile()
    )

@task
async def write_data(data: dict):
    # logger = get_run_logger()
    id = hash(json.dumps(data, default=str))
    # <http://logger.info|logger.info>(f'writing data: {id}')
    await asyncio.sleep(faker.random_int(1, 60)/10)
    # <http://logger.info|logger.info>(f'written data: {id}')
    return id


async def get_urls(n: int):
    logger = get_run_logger()
    # <http://logger.info|logger.info>(f'getting {n} urls')
    urls = []
    for i in range(n):
        urls.append(faker.uri())
    return urls

@task(timeout_seconds=10, retries=3)
async def final(result):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f'final: {result}')
    return True

@flow
async def main():
    logger = get_run_logger()
    start = time.monotonic()
    urls = await get_urls(100)
    data = [result async for result in map_unordered(fetch_url.submit, urls, limit=10)]
    results = [result async for result in map_unordered(write_data.submit, data, limit=10)]
    # await final.map(results)
    async for result in map_unordered(final.submit, results, limit=5):
        pass
    <http://logger.info|logger.info>(f'finished in {time.monotonic() - start} seconds')

if __name__ == '__main__':
    with asyncio.Runner(debug=True) as runner:
        runner.run(main())
added some random errors to make
fetch
more realistic
would love anyones thoughts on this!
s
Seeing the same problem 😢
It’s intermittent though - doesn’t happen every time
Hangs at
Beginning execution ...
c
same! well at least I’m not crazy! so I just ran my script in a debug session in vscode (haven’t really used that kind of debugging before so still figuring it out) - but it looks when you use the
submit
method it’s spawning a worker thread for each task run - and it looks like one of the workers is left hanging at the end - maybe an uncaught exception within the worker?
s
Interesting. I’m not using submit
c
hmm - yeah I have definitely had the issue without submit as well. so far the way I can get it working is having the `@task`s use
submit
but try to use plain async functions elsewhere - it’s super confusing
something that may or may not be related but have you noticed any pattern based on the amount of logging you are doing within your flow? I have seen issues with basically rate limit errors on the logging api causing things to hang but I can’t consistently reproduce
s
TBH I don’t know, i haven’t investigated very much - mostly just retry the flow again b/c sometimes it does work 🤷
I’ll let you know if I see anything
c
haha - yeah same. likewise
curious - what version are you running on? I just upgraded to most recent and while it hasn’t completely resolved it does seem to have changed the behavior a little
so I’m definitely seeing issues caused by logging - I think, in part, because asyncio tasks can create so many log messages concurrently that it maxes out the http connection. have you tested your flows with the prefect logging level set to DEBUG? do you see logging errors at all?
s
2.10.6
haven’t seen logging issues
z
Hm I did fix a logger dealdock in 2.10.7
👍 1