Channels

#prefect-community

Title

# prefect-community

t

Thomas Fredriksen

12/09/2022, 1:14 PMHello everyone. I am experimenting a bit with the dask taskrunner, and I am running into some issues related to priority.
I have the following test-flow:

Copy code

```
from typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
```

When running this with the `DaskTaskrunner`

, the task `get_primes_from_split`

is scheduled first, then the dask-future `is_prime`

. Since `get_primes_from_split`

is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else. `get_primes_from_split`

naturally is waiting for `is_prime`

to complete, which unfortunately will not execute at this point.
Toying around with priorities, I managed to get `is_prime`

to execute:
Copy code

```
from typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
with dask.annotate(priority=0):
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
```

This causes dask to schedule a few instances of `get_primes_from_split`

, which in turn schedules all its instances of `is_prime`

. `is_prime`

executes properly and starts returning its results, but it doesn't seem like `get_primes_from_split`

picks up execution.
I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?✅ 1

k

Khuyen Tran

12/09/2022, 7:55 PMHi **@Thomas Fredriksen**, I don’t see you using DaskTaskRunner in the examples above. Am I missing something here?

t

Thomas Fredriksen

12/09/2022, 8:04 PMSorry, i should have mentioned. I am setting the taskrunner dynamically when deploying the flow. Some more information here: https://github.com/PrefectHQ/prefect/pull/7546

We have verified that the cluster is created and have successfully run simple flows on dask

🎉 1

k

Khuyen Tran

12/14/2022, 5:10 PMt

Thomas Fredriksen

12/19/2022, 8:11 AMa

Anna Geller

12/19/2022, 12:58 PM

My advice would be to build a minimal reproducible example and open an issue in the prefect-dask repo. Slack is really only helpful for quick ad hoc questions and this seems to be a bit more elaborate issue we would need to dive deeper

8 Views