alvin goh11/29/2022, 10:23 AM
Khuyen Tran11/29/2022, 4:47 PM
alvin goh11/30/2022, 4:43 AM
@task(retries=3) async def get_stuff(url): # get stuff from a server return ... @task() async def process_results(result_g): # processing on results from server data1,data2 = await result_g # process result ... @flow() async def main(urls): data1urls = [...] data2urls = [...] # urls is an array of urls all_gathers =  for a,b in zip(data1urls, data2urls): all_gathers.append(asyncio.gather(get_stuff(a), get_stuff(b))) all_process_gathers = asyncio.gather(*[process_results(g) for g in all_gathers]) all_process_results = await all_process_gathers # <--- trigger coro cascade here
to be raised in
which is quite dirty imo. Ideally, i would like an upstream failure to result in
not be triggered at all, like the TriggerFailed state in prefect 1.