Hello! I am attempting to upgrade prefect to to pr...
# ask-community
n
Hello! I am attempting to upgrade prefect to to prefect 3 (specifically
3.1.4
), and some of our large flows (thousands of tasks) are hanging where they weren't before when using the
DaskTaskRunner
and submitting the tasks. I managed to build the following simple working example to demonstrate the issue, and thought I would ask here before submitting a bug report in case I am missing something simple:
Copy code
import time

from prefect import cache_policies, flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect_dask.task_runners import DaskTaskRunner

N_TASKS = 10000


@task(cache_policy=cache_policies.NONE)
def hello():
    time.sleep(5)
    print("Hello!")


@flow(
    task_runner=DaskTaskRunner(
        cluster_class="dask.distributed.LocalCluster",
        cluster_kwargs={
            "n_workers": 3,
            "threads_per_worker": 1,
            "dashboard_address": None,
        },
    )
)
def theFlow():
    futures = []
    for _ in range(N_TASKS):
        future = hello.submit()
        futures.append(future)

    print("All Submitted")
    return futures


if __name__ == "__main__":
    theFlow()
When I run this with
N_TASKS = 100
, the flow successfully executes to completion. However, when I set
N_TASKS = 10000
as above, the first few dozen tasks successfully completes, and then the flow hangs for hours without doing anything more. Indeed, the last logged line (I'll put the last several hundred in a comment below) is the
"All Submitted"
print statement I added, which leads me to believe that tasks are allowed to execute while there are more to submit, but once they have all been submitted and the flow returns, the whole flow freezes. What can I do to get this flow to execute to completion when
N_TASKS = 10000
while still using submitted tasks to the
DaskTaskRunner
?
Last log lines, including some completed tasks and some error messages from
distributed.client
(observe that the final line is the
All Submitted
print message; note also that I had to edit the ):
Copy code
19:01:33.665 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-fff03913-acf1-11ef-9dcc-972606475641
19:01:33.676 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:46488>; closing.
19:01:33.687 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-fff0eb3f-acf1-11ef-9dd0-972606475641
19:01:33.694 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:46494>; closing.
19:01:33.709 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-fff145ca-acf1-11ef-9dd2-972606475641
19:01:33.716 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:46510>; closing.
19:01:33.751 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-fff03913-acf1-11ef-9dcc-972606475641
19:01:33.766 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-fff0eb3f-acf1-11ef-9dd0-972606475641
19:01:33.777 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-fff145ca-acf1-11ef-9dd2-972606475641
19:01:34.511 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-fff03913-acf1-11ef-9dcc-972606475641
19:01:34.522 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-fff0eb3f-acf1-11ef-9dd0-972606475641
19:01:34.533 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-fff145ca-acf1-11ef-9dd2-972606475641
19:01:34.544 | INFO    | distributed.scheduler - Receive client connection: PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:34.598 | INFO    | distributed.core - Starting established connection to <tcp://127.0.0.1:35132>
19:01:34.605 | INFO    | distributed.scheduler - Receive client connection: PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:34.641 | INFO    | distributed.core - Starting established connection to <tcp://127.0.0.1:35144>
19:01:34.652 | INFO    | distributed.scheduler - Receive client connection: PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:34.674 | INFO    | distributed.core - Starting established connection to <tcp://127.0.0.1:35148>
Hello!
19:01:40.345 | INFO    | Task run 'hello-e7f' - Finished in state Completed()
Hello!
19:01:40.350 | INFO    | Task run 'hello-a7b' - Finished in state Completed()
Hello!
19:01:40.354 | INFO    | Task run 'hello-d0d' - Finished in state Completed()
19:01:40.551 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at <tcp://127.0.0.1:33657>
19:01:40.551 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at <tcp://127.0.0.1:33657>
19:01:40.562 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at <tcp://127.0.0.1:33657>
19:01:40.597 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:40.609 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35132>; closing.
19:01:40.615 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:40.620 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35144>; closing.
19:01:40.620 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:40.627 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35148>; closing.
19:01:40.691 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:40.696 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:40.703 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:41.585 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-04868ab2-acf2-11ef-9dcc-972606475641
19:01:41.590 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-0487ec17-acf2-11ef-9dd2-972606475641
19:01:41.590 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-0487e096-acf2-11ef-9dd0-972606475641
19:01:41.880 | INFO    | distributed.scheduler - Receive client connection: PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:41.914 | INFO    | distributed.core - Starting established connection to <tcp://127.0.0.1:35190>
19:01:41.921 | INFO    | distributed.scheduler - Receive client connection: PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:41.936 | INFO    | distributed.core - Starting established connection to <tcp://127.0.0.1:35196>
19:01:41.943 | INFO    | distributed.scheduler - Receive client connection: PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
19:01:41.976 | INFO    | distributed.core - Starting established connection to <tcp://127.0.0.1:35208>
19:01:43.234 | INFO    | distributed.core - Connection to <tcp://127.0.0.1:36556> has been closed.
19:01:43.245 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-d4635981-acf1-11ef-9d61-972606475641
19:01:43.805 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-d4635981-acf1-11ef-9d61-972606475641
Hello!
19:01:47.885 | INFO    | Task run 'hello-927' - Finished in state Completed()
Hello!
19:01:47.889 | INFO    | Task run 'hello-967' - Finished in state Completed()
Hello!
19:01:47.897 | INFO    | Task run 'hello-e00' - Finished in state Completed()
19:01:48.149 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:48.149 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35190>; closing.
19:01:48.156 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:48.161 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35196>; closing.
19:01:48.171 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
19:01:48.178 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:35208>; closing.
19:01:48.235 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:48.242 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:48.246 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
19:01:49.051 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-090a70ab-acf2-11ef-9dd2-972606475641
19:01:49.062 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-090a8650-acf2-11ef-9dcc-972606475641
19:01:49.066 | INFO    | distributed.scheduler - Close client connection: PrefectDaskClient-090c2bbe-acf2-11ef-9dd0-972606475641
All Submitted
For future reference, I found a workaround by paginating tasks, submitting them in chunks, and waiting for each chunk to finish before moving on to the next. Specifically, I re-implemented
theFlow
as follows:
Copy code
@flow(
    task_runner=DaskTaskRunner(
        cluster_class="dask.distributed.LocalCluster",
        cluster_kwargs={
            "n_workers": 3,
            "threads_per_worker": 1,
            "dashboard_address": None,
        },
    )
)
def theFlow():
    futures = []
    hellos = [hello for _ in range(N_TASKS)]
    paginated = [hellos[i : i + PAGINATION] for i in range(0, len(hellos), PAGINATION)]

    for i, page in enumerate(paginated):
        print(f"Page {i+1} in {len(paginated)} Started")
        currFutures = []
        for tsk in page:
            future = tsk.submit()
            currFutures.append(future)

        print(f"Page {i+1} in {len(paginated)} Submitted")

        for currFuture in currFutures:
            currFuture.wait()

        print(f"Page {i+1} in {len(paginated)} Finished")

        futures += currFutures

    print("All Submitted")

    return futures
With this implementation, 10,000 tasks successfully terminates without issue. For me, setting
PAGINATION = 1000
seems to work very well.