alvin goh
11/29/2022, 10:23 AMKhuyen Tran
11/29/2022, 4:47 PMalvin goh
11/30/2022, 4:43 AM@task(retries=3)
async def get_stuff(url):
# get stuff from a server
return ...
@task()
async def process_results(result_g):
# processing on results from server
data1,data2 = await result_g
# process result
...
@flow()
async def main(urls):
data1urls = [...]
data2urls = [...]
# urls is an array of urls
all_gathers = []
for a,b in zip(data1urls, data2urls):
all_gathers.append(asyncio.gather(get_stuff(a), get_stuff(b)))
all_process_gathers = asyncio.gather(*[process_results(g) for g in all_gathers])
all_process_results = await all_process_gathers # <--- trigger coro cascade here
get_stuff
to be raised in process_results
which is quite dirty imo. Ideally, i would like an upstream failure to result in process_results
not be triggered at all, like the TriggerFailed state in prefect 1.