# prefect-community
Hello everyone. I am experimenting a bit with the dask taskrunner, and I am running into some issues related to priority. I have the following test-flow:
from typing import List, Tuple

import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client

def is_prime(number: int) -> Tuple[int, bool]:
    if number == 2 or number == 3:
        return number, True
    if number % 2 == 0 or number < 2:
        return number, False

    for i in range(3, int(number ** 0.5) + 1, 2):
        if number % i == 0:
            return number, False

    return number, True

def get_primes_from_split(min_number, max_number) -> List[int]:

    if min_number % 2 == 0:
        min_number += 1

    with get_dask_client() as client:
        futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]

        maybe_primes = [future.result() for future in futures]

    return [value for value, flag in maybe_primes if flag]

def main(max_number: int = 1_000_000, split_size=10_000):
    log = get_run_logger()
    context = get_run_context()

    <http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
    <http://log.info|log.info>("Searching for primes from up to %d", max_number)

    futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
    primes = [value for future in futures for value in future.result()]

    if len(primes) > 10:
        <http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
        <http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
When running this with the
, the task
is scheduled first, then the dask-future
. Since
is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else.
naturally is waiting for
to complete, which unfortunately will not execute at this point. Toying around with priorities, I managed to get
to execute:
from typing import List, Tuple

import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client

def is_prime(number: int) -> Tuple[int, bool]:
    if number == 2 or number == 3:
        return number, True
    if number % 2 == 0 or number < 2:
        return number, False

    for i in range(3, int(number ** 0.5) + 1, 2):
        if number % i == 0:
            return number, False

    return number, True

def get_primes_from_split(min_number, max_number) -> List[int]:

    if min_number % 2 == 0:
        min_number += 1

    with get_dask_client() as client:
        futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]

        maybe_primes = [future.result() for future in futures]

    return [value for value, flag in maybe_primes if flag]

def main(max_number: int = 1_000_000, split_size=10_000):
    log = get_run_logger()
    context = get_run_context()

    <http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
    <http://log.info|log.info>("Searching for primes from up to %d", max_number)

    with dask.annotate(priority=0):
        futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
        primes = [value for future in futures for value in future.result()]

    if len(primes) > 10:
        <http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
        <http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
This causes dask to schedule a few instances of
, which in turn schedules all its instances of
executes properly and starts returning its results, but it doesn't seem like
picks up execution. I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?
Hi @Thomas Fredriksen, I don’t see you using DaskTaskRunner in the examples above. Am I missing something here?
Sorry, i should have mentioned. I am setting the taskrunner dynamically when deploying the flow. Some more information here: https://github.com/PrefectHQ/prefect/pull/7546
We have verified that the cluster is created and have successfully run simple flows on dask
@Khuyen Tran - sorry that I was not clear. We have indeed verified that the cluster is created and have successfully run simple flows. Running more complex flows such as the example above is still a problem. Do you have any idea how we can submit Dask tasks inside of Prefect tasks in order to avoid the deadlock-state described above?
@Ryan Peden @Mason Menges Do you have any insights in this?
@Anna Geller Do you have any advice on this one?
My advice would be to build a minimal reproducible example and open an issue in the prefect-dask repo. Slack is really only helpful for quick ad hoc questions and this seems to be a bit more elaborate issue we would need to dive deeper