Nathan Woodbury
11/27/2024, 8:23 PM3.1.4
), and some of our large flows (thousands of tasks) are hanging where they weren't before when using the DaskTaskRunner
and submitting the tasks.
I managed to build the following simple working example to demonstrate the issue, and thought I would ask here before submitting a bug report in case I am missing something simple:
import time
from prefect import cache_policies, flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect_dask.task_runners import DaskTaskRunner
N_TASKS = 10000
@task(cache_policy=cache_policies.NONE)
def hello():
time.sleep(5)
print("Hello!")
@flow(
task_runner=DaskTaskRunner(
cluster_class="dask.distributed.LocalCluster",
cluster_kwargs={
"n_workers": 3,
"threads_per_worker": 1,
"dashboard_address": None,
},
)
)
def theFlow():
futures = []
for _ in range(N_TASKS):
future = hello.submit()
futures.append(future)
print("All Submitted")
return futures
if __name__ == "__main__":
theFlow()
When I run this with N_TASKS = 100
, the flow successfully executes to completion. However, when I set N_TASKS = 10000
as above, the first few dozen tasks successfully completes, and then the flow hangs for hours without doing anything more. Indeed, the last logged line (I'll put the last several hundred in a comment below) is the "All Submitted"
print statement I added, which leads me to believe that tasks are allowed to execute while there are more to submit, but once they have all been submitted and the flow returns, the whole flow freezes.
What can I do to get this flow to execute to completion when N_TASKS = 10000
while still using submitted tasks to the DaskTaskRunner
?Nathan Woodbury
11/27/2024, 8:26 PMdistributed.client
(observe that the final line is the All Submitted
print message; note also that I had to edit the ):
19:01:33.665 | INFO | distributed.scheduler - Remove client PrefectDaskClient-fff03913-acf1-11ef-9dcc-972606475641
19:01:33.676 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:46488>; closing.
19:01:33.687 | INFO | distributed.scheduler - Remove client PrefectDaskClient-fff0eb3f-acf1-11ef-9dd0-972606475641
19:01:33.694 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:46494>; closing.
19:01:33.709 | INFO | distributed.scheduler - Remove client PrefectDaskClient-fff145ca-acf1-11ef-9dd2-972606475641
19:01:33.716 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:46510>; closing.
19:01:33.751 | INFO | distributed.scheduler - Remove client PrefectDaskClient-fff03913-acf1-11ef-9dcc-972606475641
19:01:33.766 | INFO | distributed.scheduler - Remove client PrefectDaskClient-fff0eb3f-acf1-11ef-9dd0-972606475641
19:01:33.777 | INFO | distributed.scheduler - Remove client PrefectDaskClient-fff145ca-acf1-11ef-9dd2-972606475641
19:01:34.511 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-fff03913-acf1-11ef-9dcc-972606475641
19:01:34.522 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-fff0eb3f-acf1-11ef-9dd0-972606475641
19:01:34.533 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-fff145ca-acf1-11ef-9dd2-972606475641
19:01:34.544 | INFO | distributed.scheduler - Receive client connection: PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:34.598 | INFO | distributed.core - Starting established connection to <tcp://127.0.0.1:35132>
19:01:34.605 | INFO | distributed.scheduler - Receive client connection: PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:34.641 | INFO | distributed.core - Starting established connection to <tcp://127.0.0.1:35144>
19:01:34.652 | INFO | distributed.scheduler - Receive client connection: PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:34.674 | INFO | distributed.core - Starting established connection to <tcp://127.0.0.1:35148>
Hello!
19:01:40.345 | INFO | Task run 'hello-e7f' - Finished in state Completed()
Hello!
19:01:40.350 | INFO | Task run 'hello-a7b' - Finished in state Completed()
Hello!
19:01:40.354 | INFO | Task run 'hello-d0d' - Finished in state Completed()
19:01:40.551 | INFO | prefect.task_runner.dask - Connecting to an existing Dask cluster at <tcp://127.0.0.1:33657>
19:01:40.551 | INFO | prefect.task_runner.dask - Connecting to an existing Dask cluster at <tcp://127.0.0.1:33657>
19:01:40.562 | INFO | prefect.task_runner.dask - Connecting to an existing Dask cluster at <tcp://127.0.0.1:33657>
19:01:40.597 | INFO | distributed.scheduler - Remove client PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:40.609 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35132>; closing.
19:01:40.615 | INFO | distributed.scheduler - Remove client PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:40.620 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35144>; closing.
19:01:40.620 | INFO | distributed.scheduler - Remove client PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:40.627 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35148>; closing.
19:01:40.691 | INFO | distributed.scheduler - Remove client PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:40.696 | INFO | distributed.scheduler - Remove client PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:40.703 | INFO | distributed.scheduler - Remove client PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:41.585 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:41.590 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:41.590 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:41.880 | INFO | distributed.scheduler - Receive client connection: PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:41.914 | INFO | distributed.core - Starting established connection to <tcp://127.0.0.1:35190>
19:01:41.921 | INFO | distributed.scheduler - Receive client connection: PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:41.936 | INFO | distributed.core - Starting established connection to <tcp://127.0.0.1:35196>
19:01:41.943 | INFO | distributed.scheduler - Receive client connection: PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
19:01:41.976 | INFO | distributed.core - Starting established connection to <tcp://127.0.0.1:35208>
19:01:43.234 | INFO | distributed.core - Connection to <tcp://127.0.0.1:36556> has been closed.
19:01:43.245 | INFO | distributed.scheduler - Remove client PrefectDaskClient-d4635981-acf1-11ef-9d61-972606475641
19:01:43.805 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-d4635981-acf1-11ef-9d61-972606475641
Hello!
19:01:47.885 | INFO | Task run 'hello-927' - Finished in state Completed()
Hello!
19:01:47.889 | INFO | Task run 'hello-967' - Finished in state Completed()
Hello!
19:01:47.897 | INFO | Task run 'hello-e00' - Finished in state Completed()
19:01:48.149 | INFO | distributed.scheduler - Remove client PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:48.149 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35190>; closing.
19:01:48.156 | INFO | distributed.scheduler - Remove client PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:48.161 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35196>; closing.
19:01:48.171 | INFO | distributed.scheduler - Remove client PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
19:01:48.178 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35208>; closing.
19:01:48.235 | INFO | distributed.scheduler - Remove client PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:48.242 | INFO | distributed.scheduler - Remove client PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:48.246 | INFO | distributed.scheduler - Remove client PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
19:01:49.051 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:49.062 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:49.066 | INFO | distributed.scheduler - Close client connection: PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
All Submitted
Nathan Woodbury
12/04/2024, 7:45 PMtheFlow
as follows:
@flow(
task_runner=DaskTaskRunner(
cluster_class="dask.distributed.LocalCluster",
cluster_kwargs={
"n_workers": 3,
"threads_per_worker": 1,
"dashboard_address": None,
},
)
)
def theFlow():
futures = []
hellos = [hello for _ in range(N_TASKS)]
paginated = [hellos[i : i + PAGINATION] for i in range(0, len(hellos), PAGINATION)]
for i, page in enumerate(paginated):
print(f"Page {i+1} in {len(paginated)} Started")
currFutures = []
for tsk in page:
future = tsk.submit()
currFutures.append(future)
print(f"Page {i+1} in {len(paginated)} Submitted")
for currFuture in currFutures:
currFuture.wait()
print(f"Page {i+1} in {len(paginated)} Finished")
futures += currFutures
print("All Submitted")
return futures
With this implementation, 10,000 tasks successfully terminates without issue. For me, setting PAGINATION = 1000
seems to work very well.