Apostolos Papafragkakis
06/09/2022, 12:58 PMAnna Geller
ConcurrentTaskRunner
(which is the default one in Prefect 2.0) is super lightweight and perfectly capable of solving the use cases you would otherwise have to delegate to DaskApostolos Papafragkakis
06/09/2022, 1:16 PMAnna Geller
ConcurrentTaskRunner
first, then for benchmarking and comparison, perhaps try running DaskTaskRunner in various configurations using threads, e.g.
@flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=4, processes=False)))
Anna Geller
Apostolos Papafragkakis
06/09/2022, 1:19 PMAnna Geller
Apostolos Papafragkakis
06/09/2022, 1:20 PMAnna Geller
Anna Geller
Apostolos Papafragkakis
06/09/2022, 1:23 PMAnna Geller
DaskTaskRunner
Apostolos Papafragkakis
06/09/2022, 1:50 PMAnna Geller
Zanie
Apostolos Papafragkakis
06/10/2022, 9:22 AMApostolos Papafragkakis
06/10/2022, 11:03 AMApostolos Papafragkakis
06/10/2022, 11:24 AMimport asyncio
from prefect import flow, task
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow()
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
Apostolos Papafragkakis
06/10/2022, 11:26 AMimport asyncio
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow(task_runner=DaskTaskRunner())
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
Anna Geller
Anna Geller
Apostolos Papafragkakis
06/10/2022, 12:10 PMZanie
mockup
as that is awaiting submission to the worker, not the result of your task.Zanie
Zanie
Apostolos Papafragkakis
06/14/2022, 7:04 AMAnna Geller