Aaron Gonzalez
02/20/2023, 7:04 PMimport asyncio
import random
import time
from prefect import flow, task, get_run_logger
async def async_func():
if random.random() > 0.3:
time.sleep(1)
await asyncio.sleep(random.randint(1, 5))
return 1
@task
async def my_task():
result = await async_func()
return result
@flow()
async def my_flow():
tasks = []
for _ in range(500):
t = my_task()
tasks.append(t)
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
asyncio.run(my_flow())
How can one create a deployment for this? Would you still be able to use Deployment.build_from_flow()
with this? And if so, would this still work with deployment parameters?Nate
02/20/2023, 8:04 PMprefect deployment build --name testasync testing-prefect/testasync.py:my_flow -a
and i assume Deployment.build_from_flow()
would work just the sameAaron Gonzalez
02/20/2023, 8:09 PMNate
02/20/2023, 8:41 PMAaron Gonzalez
02/20/2023, 8:44 PMNate
02/20/2023, 8:47 PM(total number of tasks) / N
worker flowsAaron Gonzalez
02/20/2023, 8:55 PMNate
02/21/2023, 1:27 AM