https://prefect.io logo
Title
a

alvin goh

11/29/2022, 10:23 AM
Prefect 2.6.9 Hi all, for prefect 2, how can I use asyncio with upstream task triggers/dependencies? I have an async task that takes as param an array of upstream async task outputs (an array of coros which I have bunched together with asyncio.gather to feed the param). Does prefect help to ensure the task doesn't run if any upstream coroutines fail?
k

Khuyen Tran

11/29/2022, 4:47 PM
Can you show an example of your code so we can understand the question better?
a

alvin goh

11/30/2022, 4:43 AM
@task(retries=3)
async def get_stuff(url):
    
    # get stuff from a server
    return ...

@task()
async def process_results(result_g):
    # processing on results from server
    data1,data2 = await result_g
    # process result
    ...

@flow()
async def main(urls):
    data1urls = [...]
    data2urls = [...]

    # urls is an array of urls
    all_gathers = []
    for a,b in zip(data1urls, data2urls):
        all_gathers.append(asyncio.gather(get_stuff(a), get_stuff(b)))

    all_process_gathers = asyncio.gather(*[process_results(g) for g in all_gathers])
    all_process_results = await all_process_gathers # <--- trigger coro cascade here
so i have a bunch of urls to fetch data from, and i have to bunch them up 2 at a time and feed each pair to a downstream proceesor. the current design is with nested asyncio gathers. 1. futures from upstream tasks are passed as params into downstream async tasks, to be awaited within the downstream task. This is to ensure lazy execution instead of blocking when the async tasks were created. 2. This results in an upstream failure in
get_stuff
to be raised in
process_results
which is quite dirty imo. Ideally, i would like an upstream failure to result in
process_results
not be triggered at all, like the TriggerFailed state in prefect 1.