#prefect-community
Title
# prefect-community
t

Thomas Fredriksen

12/09/2022, 1:14 PM
Hello everyone. I am experimenting a bit with the dask taskrunner, and I am running into some issues related to priority. I have the following test-flow:
Copy code
``````from typing import List, Tuple

from prefect import flow, get_run_logger, task
from prefect.context import get_run_context

def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False

for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False

return number, True

def get_primes_from_split(min_number, max_number) -> List[int]:

if min_number % 2 == 0:
min_number += 1

futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]

maybe_primes = [future.result() for future in futures]

return [value for value, flag in maybe_primes if flag]

@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()

<http://log.info|log.info>("Searching for primes from up to %d", max_number)

futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]

if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]``````
When running this with the
``DaskTaskrunner``
``get_primes_from_split``
is scheduled first, then the dask-future
``is_prime``
. Since
``get_primes_from_split``
is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else.
``get_primes_from_split``
naturally is waiting for
``is_prime``
to complete, which unfortunately will not execute at this point. Toying around with priorities, I managed to get
``is_prime``
to execute:
Copy code
``````from typing import List, Tuple

from prefect import flow, get_run_logger, task
from prefect.context import get_run_context

def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False

for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False

return number, True

def get_primes_from_split(min_number, max_number) -> List[int]:

if min_number % 2 == 0:
min_number += 1

futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]

maybe_primes = [future.result() for future in futures]

return [value for value, flag in maybe_primes if flag]

@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()

<http://log.info|log.info>("Searching for primes from up to %d", max_number)

futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]

if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])``````
This causes dask to schedule a few instances of
``get_primes_from_split``
, which in turn schedules all its instances of
``is_prime``
.
``is_prime``
executes properly and starts returning its results, but it doesn't seem like
``get_primes_from_split``
picks up execution. I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?
1
k

Khuyen Tran

12/09/2022, 7:55 PM
Hi @Thomas Fredriksen, I don’t see you using DaskTaskRunner in the examples above. Am I missing something here?
t

Thomas Fredriksen

12/09/2022, 8:04 PM
Sorry, i should have mentioned. I am setting the taskrunner dynamically when deploying the flow. Some more information here: https://github.com/PrefectHQ/prefect/pull/7546
We have verified that the cluster is created and have successfully run simple flows on dask
🎉 1
@Khuyen Tran - sorry that I was not clear. We have indeed verified that the cluster is created and have successfully run simple flows. Running more complex flows such as the example above is still a problem. Do you have any idea how we can submit Dask tasks inside of Prefect tasks in order to avoid the deadlock-state described above?
k

Khuyen Tran

12/14/2022, 5:10 PM
@Ryan Peden @Mason Menges Do you have any insights in this?
t

Thomas Fredriksen

12/19/2022, 8:11 AM
@Anna Geller Do you have any advice on this one?
a

Anna Geller

12/19/2022, 12:58 PM
My advice would be to build a minimal reproducible example and open an issue in the prefect-dask repo. Slack is really only helpful for quick ad hoc questions and this seems to be a bit more elaborate issue we would need to dive deeper
8 Views