Channels

pacc-may-31-2023

prefect-ai

pacc-clearcover-june-12-2023

marvin-in-the-wild

data-ecosystem

geo-israel

geo-japan

prefect-cloud

ppcc-may-16-2023

prefect-azure

prefect-docker

prefect-recipes

gratitude

geo-nyc

geo-bay-area

geo-boston

geo-london

geo-dc

geo-chicago

geo-berlin

geo-texas

geo-seattle

geo-colorado

prefect-community

data-tricks-and-tips

prefect-aws

prefect-gcp

introductions

find-a-prefect-job

prefect-dbt

random

events

ask-marvin

show-us-what-you-got

prefect-getting-started

prefect-integrations

prefect-contributors

best-practices-coordination-plane

announcements

prefect-server

prefect-ui

prefect-kubernetes

Title

t

Thomas Fredriksen

12/09/2022, 1:14 PMHello everyone. I am experimenting a bit with the dask taskrunner, and I am running into some issues related to priority.
I have the following test-flow:

```
from typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
```

When running this with the `DaskTaskrunner`

, the task `get_primes_from_split`

is scheduled first, then the dask-future `is_prime`

. Since `get_primes_from_split`

is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else. `get_primes_from_split`

naturally is waiting for `is_prime`

to complete, which unfortunately will not execute at this point.
Toying around with priorities, I managed to get `is_prime`

to execute:
```
from typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
with dask.annotate(priority=0):
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
```

This causes dask to schedule a few instances of `get_primes_from_split`

, which in turn schedules all its instances of `is_prime`

. `is_prime`

executes properly and starts returning its results, but it doesn't seem like `get_primes_from_split`

picks up execution.
I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?✅ 1

k

Khuyen Tran

12/09/2022, 7:55 PMHi **@Thomas Fredriksen**, I don’t see you using DaskTaskRunner in the examples above. Am I missing something here?

t

Thomas Fredriksen

12/09/2022, 8:04 PMSorry, i should have mentioned. I am setting the taskrunner dynamically when deploying the flow. Some more information here: https://github.com/PrefectHQ/prefect/pull/7546

We have verified that the cluster is created and have successfully run simple flows on dask

🎉 1

k

Khuyen Tran

12/14/2022, 5:10 PMt

Thomas Fredriksen

12/19/2022, 8:11 AMa

Anna Geller

12/19/2022, 12:58 PMMy advice would be to build a minimal reproducible example and open an issue in the prefect-dask repo. Slack is really only helpful for quick ad hoc questions and this seems to be a bit more elaborate issue we would need to dive deeper