Apostolos Papafragkakis
06/09/2022, 12:58 PMAnna Geller
06/09/2022, 1:14 PMConcurrentTaskRunner
(which is the default one in Prefect 2.0) is super lightweight and perfectly capable of solving the use cases you would otherwise have to delegate to DaskApostolos Papafragkakis
06/09/2022, 1:16 PMAnna Geller
06/09/2022, 1:17 PMConcurrentTaskRunner
first, then for benchmarking and comparison, perhaps try running DaskTaskRunner in various configurations using threads, e.g.
@flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=4, processes=False)))
Apostolos Papafragkakis
06/09/2022, 1:19 PMAnna Geller
06/09/2022, 1:20 PMApostolos Papafragkakis
06/09/2022, 1:20 PMAnna Geller
06/09/2022, 1:22 PMApostolos Papafragkakis
06/09/2022, 1:23 PMAnna Geller
06/09/2022, 1:28 PMDaskTaskRunner
Apostolos Papafragkakis
06/09/2022, 1:50 PMAnna Geller
06/09/2022, 2:04 PMZanie
06/09/2022, 2:55 PMApostolos Papafragkakis
06/10/2022, 9:22 AMimport asyncio
from prefect import flow, task
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow()
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
import asyncio
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow(task_runner=DaskTaskRunner())
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
Anna Geller
06/10/2022, 12:07 PMApostolos Papafragkakis
06/10/2022, 12:10 PMZanie
06/10/2022, 3:48 PMmockup
as that is awaiting submission to the worker, not the result of your task.Apostolos Papafragkakis
06/14/2022, 7:04 AMAnna Geller
06/14/2022, 12:02 PM