Thomas Fredriksen
12/09/2022, 1:14 PMfrom typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
When running this with the DaskTaskrunner
, the task get_primes_from_split
is scheduled first, then the dask-future is_prime
. Since get_primes_from_split
is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else. get_primes_from_split
naturally is waiting for is_prime
to complete, which unfortunately will not execute at this point.
Toying around with priorities, I managed to get is_prime
to execute:
from typing import List, Tuple
import dask
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect_dask import get_dask_client
def is_prime(number: int) -> Tuple[int, bool]:
if number == 2 or number == 3:
return number, True
if number % 2 == 0 or number < 2:
return number, False
for i in range(3, int(number ** 0.5) + 1, 2):
if number % i == 0:
return number, False
return number, True
@task
def get_primes_from_split(min_number, max_number) -> List[int]:
if min_number % 2 == 0:
min_number += 1
with get_dask_client() as client:
futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]
maybe_primes = [future.result() for future in futures]
return [value for value, flag in maybe_primes if flag]
@flow(name="example_prime_number_search")
def main(max_number: int = 1_000_000, split_size=10_000):
log = get_run_logger()
context = get_run_context()
<http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
<http://log.info|log.info>("Searching for primes from up to %d", max_number)
with dask.annotate(priority=0):
futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
primes = [value for future in futures for value in future.result()]
if len(primes) > 10:
<http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
else:
<http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
This causes dask to schedule a few instances of get_primes_from_split
, which in turn schedules all its instances of is_prime
. is_prime
executes properly and starts returning its results, but it doesn't seem like get_primes_from_split
picks up execution.
I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?Khuyen Tran
12/09/2022, 7:55 PMThomas Fredriksen
12/09/2022, 8:04 PMKhuyen Tran
12/14/2022, 5:10 PMThomas Fredriksen
12/19/2022, 8:11 AMAnna Geller
12/19/2022, 12:58 PM