Hi everyone -- I need a sanity check. I am attempt...
# ask-community
t
Hi everyone -- I need a sanity check. I am attempting to move a hefty module from v2 to v3 and geting some very unexpected behaviour. The workflow/s have a lot of the following:
Copy code
list_of_futures = task_1.map(some_iterables)
aggreagted_result = task_2.submit(list_of_futures)
This seems to be very strange when using a
DaskTaskRunner
, and a MWE for me is below:
Copy code
"""An attempt to understand how prefect3 works"""

import logging
from time import sleep

from prefect import task, flow, __version__
from prefect.futures import wait

from prefect_dask import DaskTaskRunner

logger = logging.getLogger("tp3")
logger.addHandler(logging.StreamHandler())
logger.setLevel(<http://logging.INFO|logging.INFO>)

local_cluster= {
    "cluster_class": "dask.distributed.LocalCluster"
}
 
@task(log_prints=True, cache_policy=None)
def adder(a: int) -> int:
    <http://logger.info|logger.info>(f"Value of {a=}")
    return a + 1


def summer(numbers: list[int], another_thing: bool) -> int:
    <http://logger.info|logger.info>(f"The numbers to sum: {numbers=}")
    for number in numbers:
        <http://logger.info|logger.info>(number)
    
    SEC = 5
    <http://logger.info|logger.info>(f"Sleeping now for {SEC}")
    sleep(SEC)
    return sum(numbers)

# I have this usage in my module whenever possible
task_summer = task(summer, log_prints=True, cache_policy=None)

@flow(log_prints=True)
def the_flow() -> None:
    numbers = list(range(4))
    <http://logger.info|logger.info>(f"The numbers as {numbers=}")
    
    # Make up two sets of futures in deifferent ways. These should be
    # equivalant.
    numbers_list = [adder.submit(n) for n in numbers]
    numbers_map = adder.map(numbers)
    
    # Iterate over both sets of lists of futures
    for i, _numbers in enumerate((numbers_list, numbers_map)):
        
        <http://logger.info|logger.info>(f"In the main flow {_numbers=}")
        
        # This consistently works
        summed_comp = task_summer.with_options(
            name=f"Summer List Comp {i=}"
        ).submit([n.result() for n in _numbers], another_thing=True)
        
        # This only works when list of submits(), not single .map()
        summed = task_summer.with_options(
            name=f"summer Direct {i=}"
        ).submit(_numbers, another_thing=True)
        
        wait([summed, summed_comp])
    

def main() -> None:
    # This works
    the_flow.with_options(name="My Flow with default")()
    
    # This does not work when handling in `.map()`->`.submit()`
    the_flow.with_options(task_runner=DaskTaskRunner(), name="My Flow - DaskTaskRunner Default")()
    
    # This does not work when handling in `.map()`->`.submit()`
    the_flow.with_options(task_runner=DaskTaskRunner(**local_cluster), name="My Flow - DaskTaskRunner LocalCluster")()
    
if __name__ == "__main__":
    main()
The
My Flow with default
seems to work in all situations for all tasks. However, I find that the
Summer Direct i=1
task consistently fails when a DaskTaskRunner is used. The error I get from
Summer Direct i=1
is:
Copy code
Task run failed with exception: AttributeError("'NoneType' object has no attribute 'type'")
Traceback (most recent call last):
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 870, in run_context
    yield self
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 1507, in run_task_sync
    engine.call_task_fn(txn)
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 887, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/scratch3/gal16b/packages/test_prefect3/test_script.py", line 26, in summer
    <http://logger.info|logger.info>(f"The numbers to sum: {numbers=}")
                                      ^^^^^^^^^^
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 602, in __repr__
    if self.type:
       ^^^^^^^^^
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 567, in type
    return self._state.type
           ^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'type'
`Hi all - I need a sanity check. I am going from v2 -> v3, so picking up the differences. The module I am working with uses a lot of
I am naturally a little confused and am not really able to tie down what the difference between
[adder.submit(n) for n in numbers]
vs
adder.map(numbers)
would be. I have used the
.map()
task method pretty regularly throughout my workflows, so am a little reluctant to axe it. Of course if my usage was not correct than i will. This just seems a little odd.
Copy code
Version:              3.4.18
API version:          0.8.4
Python version:       3.12.11
Git commit:           3baaf4f4
Built:                Fri, Sep 12, 2025 04:03 PM
OS/Arch:              linux/x86_64
Profile:              ephemeral
Server type:          ephemeral
Pydantic version:     2.10.6
Server:
  Database:           sqlite
  SQLite version:     3.50.4
Integrations:
  prefect-dask:       0.3.6
e.g. these are raised when using a DaskTaskRunner with default options
n
your code runs as i'd expect for me on
main
- can you share
prefect version
output?
the only real difference 2->3 here is that terminal futures are not automatically resolved that is, you must either either
return
futures directly or call
.result
or
.wait
on them to resolve them, prefect 3.x will not magically find unresolved futures and resolve them for you like it did in 2.x (caused other problems) do you have an MRE for the
AttributeError
you shared above?
t
Hi Nate - thanks for replying. I will look over this tomorrow with a fresh mind (it is late here and the pub was a wonderful eveing...). The
prefect versions
I attached as a reply to the original message, and the code I shared in the original post was my MWE. When I run it on a 'default' task runner (i.e. leaving it blank in the @flow definitions) it worker. It was only when I introduced the DaskTaskRunner did it dail. When you say
return
them directly, what do you mean? If I have a simple pythong function:
Copy code
@task
def add(i: int) -> int:
    return i + 1

add.submit(i=2)
does that suffice? How does it behave if I
.map(i=list(range(20)))
?
n
The
prefect versions
I attached as a reply to the original message
doh! my bad facepalm i look through things too quickly sometimes i copied and pasted your code and ran
uv run --extra dask your_script.py
from the prefect repo and did not encounter an error but i'll take a closer look at it to understand what you might be running into have a great night!
t
The real difference in the initial example is that II am seeing different behaviour betwheen:
Copy code
numbers = list(range(4))
task_lisk = [task.submit(i) for i in numbers]
task_map = task.map(numbers)

summer.submit(task_list) # works
wummder.submit(task_map) # fails
The above skeleton works when I used a
@flow
with no task run options, but fails when I
@flow(..., task_runner=DaskTaskRunner)
I can try to same tomorrow - thanks a lot! I will have a closer look as your
uv
run comman tomorrow. For full context I have these extra environment variables on my launch script to the HPC I am using, but dont see them as a cause of concer: export APIURL=http://my.machine.com4200/api export PREFECT_API_URL="${APIURL}" export WORKDIR=$(pwd) export PREFECT_HOME="${WORKDIR}/prefect" export PREFECT_LOGGING_EXTRA_LOGGERS="tp3" export PREFECT_LOGGING_LEVEL="INFO" export PREFECT_RESULTS_PERSIST_BY_DEFAULT=true #echo "Sourcing home" #source /home/$(whoami)/.bashrc ENV="flint_prefect3" echo "Activating conda flint environment: ${ENV}" conda activate "${ENV}" python /scratch3/gal16b/packages/test_prefect3/test_script.py
Ok, so. I did a: ` conda create -n uvprefect3 python=3.13 • pip install uv • git clone git@github.com:PrefectHQ/prefect.git • uv run --extra dask my_script.py and it did indeed run all the way through with no error. Same is also true when I set it to point to my self-managed prefect server. And this does not reproduce the error I was / am seeing. I also progressively introduced the environment variables I was using to furtherw configure my environment, and no error was being raised. So, I am a little confused.
n
hmm. so even if i dont run directly against the repo it seems to run fine for me steps for further digging if you're interested (this is where `uv` is really nice!) • go up to this message of yours • literally copy the script to my clipboard •
pbpaste | uv run --with 'prefect[dask]' -
(i have a mac, so
pbpaste
gives the content of my clipboard) •
--with
can specify any version of prefect with the dask extra installed
prefect[dask]==3.x.y
or pass
prefect
and
prefect-dask
separately if you like
t
I can give that a whirl. It really is an annoying set of gotchas that I am trying to unravel. Thanks for the suggestions. I will make another pass over this in the next day or two
n
the only real difference 2->3 here is that terminal futures are not automatically resolved
that is, you must either either
return
futures directly or call
.result
or
.wait
on them to resolve them, prefect 3.x will not magically find unresolved futures and resolve them for you like it did in 2.x (caused other problems)
this is generally the only real difference that should exist in terms of how prefect works from 2 to 3
t
Well, I switched to a different conda environemtn and ran the
uv run
command as mentioned above and I am getting this again
Copy code
01:08:16.578 | INFO    | tp3 - Sleeping now for 5
01:08:16.627 | ERROR   | Task run 'summer Direct i=1-d18' - Task run failed with exception: AttributeError("'NoneType' object has no attribute 'type'")
Traceback (most recent call last):
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 870, in run_context
    yield self
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 1507, in run_task_sync
    engine.call_task_fn(txn)
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 887, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/scratch3/gal16b/packages/test_prefect3/test_script.py", line 26, in summer
    logger.info(f"The numbers to sum: {numbers=}")
                                      ^^^^^^^^^^
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 602, in __repr__
    if self.type:
       ^^^^^^^^^
  File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 567, in type
    return self._state.type
           ^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'type'
01:08:16.638 | ERROR   | Task run 'summer Direct i=1-d18' - Finished in state Failed("Task run encountered an exception AttributeError: 'NoneType' object has no attribute 'type'")
So. I am guessing something in this environment specifically is causing me grief
n
it looks like this is something to do with the
distributed
version that
miniconda
is holding onto here
Copy code
/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py
t
Copy code
(flint_prefect3) gal16b@petrichor-login:/scratch3/gal16b/find_error> dask --version
dask, version 2024.10.0
It is a little outdated - I made this environment up this week to try out prefect3. So, there might be a dependency setting this
n
aha
Copy code
pbpaste | uv run --with 'prefect[dask]' --with distributed==2024.10.0 -
this reproduces for me
t
I am not crazy!
n
seems safe to say its about that
distributed
version, i'd try and upgrade that. we might wanna pin above that in
prefect-dask
t
Gold. I am running some other flows now. Once they are done (and when I wake up in the morning) I will do that and report back. I am a little unsure why such an old version was installed to begin with. So I might have to go hunting through my dependencies ...
n
sounds good! (PS) can't recommend
uv
enough 🙂 (i know people have reasons for using other things! but) as someone who has used all the things for all the reasons, i think about inscrutable python environment things a tiny fraction of the time that i used to
i also know new things come out a lot and the fatigue of moving is real! just my 2 cents
t
Believe me I love uv as well. But, I have to also regularly install
casacore
and
python-casacore
. These are teh pain that keep in the conda sphere
n
salute canada intensifies
t
I went to raise an issue on this, but see you beat me to it!