https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Thomas Fredriksen

12/09/2022, 1:14 PM
Hello everyone. I am experimenting a bit with the dask taskrunner, and I am running into some issues related to priority. I have the following test-flow:
Copy code
from typing import List, Tuple

import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client


def is_prime(number: int) -> Tuple[int, bool]:
    if number == 2 or number == 3:
        return number, True
    if number % 2 == 0 or number < 2:
        return number, False

    for i in range(3, int(number ** 0.5) + 1, 2):
        if number % i == 0:
            return number, False

    return number, True


@task
def get_primes_from_split(min_number, max_number) -> List[int]:

    if min_number % 2 == 0:
        min_number += 1

    with get_dask_client() as client:
        futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]

        maybe_primes = [future.result() for future in futures]

    return [value for value, flag in maybe_primes if flag]


@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
    log = get_run_logger()
    context = get_run_context()

    <http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
    <http://log.info|log.info>("Searching for primes from up to %d", max_number)

    futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
    primes = [value for future in futures for value in future.result()]

    if len(primes) > 10:
        <http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
    else:
        <http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
When running this with the
DaskTaskrunner
, the task
get_primes_from_split
is scheduled first, then the dask-future
is_prime
. Since
get_primes_from_split
is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else.
get_primes_from_split
naturally is waiting for
is_prime
to complete, which unfortunately will not execute at this point. Toying around with priorities, I managed to get
is_prime
to execute:
Copy code
from typing import List, Tuple

import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client


def is_prime(number: int) -> Tuple[int, bool]:
    if number == 2 or number == 3:
        return number, True
    if number % 2 == 0 or number < 2:
        return number, False

    for i in range(3, int(number ** 0.5) + 1, 2):
        if number % i == 0:
            return number, False

    return number, True


@task
def get_primes_from_split(min_number, max_number) -> List[int]:

    if min_number % 2 == 0:
        min_number += 1

    with get_dask_client() as client:
        futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]

        maybe_primes = [future.result() for future in futures]

    return [value for value, flag in maybe_primes if flag]


@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
    log = get_run_logger()
    context = get_run_context()

    <http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
    <http://log.info|log.info>("Searching for primes from up to %d", max_number)

    with dask.annotate(priority=0):
        futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
        primes = [value for future in futures for value in future.result()]

    if len(primes) > 10:
        <http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
    else:
        <http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
This causes dask to schedule a few instances of
get_primes_from_split
, which in turn schedules all its instances of
is_prime
.
is_prime
executes properly and starts returning its results, but it doesn't seem like
get_primes_from_split
picks up execution. I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?
1
k

Khuyen Tran

12/09/2022, 7:55 PM
Hi @Thomas Fredriksen, I don’t see you using DaskTaskRunner in the examples above. Am I missing something here?
t

Thomas Fredriksen

12/09/2022, 8:04 PM
Sorry, i should have mentioned. I am setting the taskrunner dynamically when deploying the flow. Some more information here: https://github.com/PrefectHQ/prefect/pull/7546
We have verified that the cluster is created and have successfully run simple flows on dask
🎉 1
@Khuyen Tran - sorry that I was not clear. We have indeed verified that the cluster is created and have successfully run simple flows. Running more complex flows such as the example above is still a problem. Do you have any idea how we can submit Dask tasks inside of Prefect tasks in order to avoid the deadlock-state described above?
k

Khuyen Tran

12/14/2022, 5:10 PM
@Ryan Peden @Mason Menges Do you have any insights in this?
t

Thomas Fredriksen

12/19/2022, 8:11 AM
@Anna Geller Do you have any advice on this one?
a

Anna Geller

12/19/2022, 12:58 PM
My advice would be to build a minimal reproducible example and open an issue in the prefect-dask repo. Slack is really only helpful for quick ad hoc questions and this seems to be a bit more elaborate issue we would need to dive deeper
8 Views