Tim Galvin
09/19/2025, 6:45 AMlist_of_futures = task_1.map(some_iterables)
aggreagted_result = task_2.submit(list_of_futures)
This seems to be very strange when using a DaskTaskRunner
, and a MWE for me is below:
"""An attempt to understand how prefect3 works"""
import logging
from time import sleep
from prefect import task, flow, __version__
from prefect.futures import wait
from prefect_dask import DaskTaskRunner
logger = logging.getLogger("tp3")
logger.addHandler(logging.StreamHandler())
logger.setLevel(<http://logging.INFO|logging.INFO>)
local_cluster= {
"cluster_class": "dask.distributed.LocalCluster"
}
@task(log_prints=True, cache_policy=None)
def adder(a: int) -> int:
<http://logger.info|logger.info>(f"Value of {a=}")
return a + 1
def summer(numbers: list[int], another_thing: bool) -> int:
<http://logger.info|logger.info>(f"The numbers to sum: {numbers=}")
for number in numbers:
<http://logger.info|logger.info>(number)
SEC = 5
<http://logger.info|logger.info>(f"Sleeping now for {SEC}")
sleep(SEC)
return sum(numbers)
# I have this usage in my module whenever possible
task_summer = task(summer, log_prints=True, cache_policy=None)
@flow(log_prints=True)
def the_flow() -> None:
numbers = list(range(4))
<http://logger.info|logger.info>(f"The numbers as {numbers=}")
# Make up two sets of futures in deifferent ways. These should be
# equivalant.
numbers_list = [adder.submit(n) for n in numbers]
numbers_map = adder.map(numbers)
# Iterate over both sets of lists of futures
for i, _numbers in enumerate((numbers_list, numbers_map)):
<http://logger.info|logger.info>(f"In the main flow {_numbers=}")
# This consistently works
summed_comp = task_summer.with_options(
name=f"Summer List Comp {i=}"
).submit([n.result() for n in _numbers], another_thing=True)
# This only works when list of submits(), not single .map()
summed = task_summer.with_options(
name=f"summer Direct {i=}"
).submit(_numbers, another_thing=True)
wait([summed, summed_comp])
def main() -> None:
# This works
the_flow.with_options(name="My Flow with default")()
# This does not work when handling in `.map()`->`.submit()`
the_flow.with_options(task_runner=DaskTaskRunner(), name="My Flow - DaskTaskRunner Default")()
# This does not work when handling in `.map()`->`.submit()`
the_flow.with_options(task_runner=DaskTaskRunner(**local_cluster), name="My Flow - DaskTaskRunner LocalCluster")()
if __name__ == "__main__":
main()
The My Flow with default
seems to work in all situations for all tasks. However, I find that the Summer Direct i=1
task consistently fails when a DaskTaskRunner is used. The error I get from Summer Direct i=1
is:
Task run failed with exception: AttributeError("'NoneType' object has no attribute 'type'")
Traceback (most recent call last):
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 870, in run_context
yield self
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 1507, in run_task_sync
engine.call_task_fn(txn)
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 887, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/scratch3/gal16b/packages/test_prefect3/test_script.py", line 26, in summer
<http://logger.info|logger.info>(f"The numbers to sum: {numbers=}")
^^^^^^^^^^
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 602, in __repr__
if self.type:
^^^^^^^^^
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 567, in type
return self._state.type
^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'type'
`Hi all - I need a sanity check. I am going from v2 -> v3, so picking up the differences. The module I am working with uses a lot ofTim Galvin
09/19/2025, 6:49 AM[adder.submit(n) for n in numbers]
vs adder.map(numbers)
would be.
I have used the .map()
task method pretty regularly throughout my workflows, so am a little reluctant to axe it. Of course if my usage was not correct than i will. This just seems a little odd.Tim Galvin
09/19/2025, 6:49 AMVersion: 3.4.18
API version: 0.8.4
Python version: 3.12.11
Git commit: 3baaf4f4
Built: Fri, Sep 12, 2025 04:03 PM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: ephemeral
Pydantic version: 2.10.6
Server:
Database: sqlite
SQLite version: 3.50.4
Integrations:
prefect-dask: 0.3.6
Tim Galvin
09/19/2025, 6:57 AMNate
09/19/2025, 2:14 PMmain
- can you share prefect version
output?Nate
09/19/2025, 2:16 PMreturn
futures directly or call .result
or .wait
on them to resolve them, prefect 3.x will not magically find unresolved futures and resolve them for you like it did in 2.x (caused other problems)
do you have an MRE for the AttributeError
you shared above?Tim Galvin
09/19/2025, 2:24 PMprefect versions
I attached as a reply to the original message, and the code I shared in the original post was my MWE. When I run it on a 'default' task runner (i.e. leaving it blank in the @flow definitions) it worker. It was only when I introduced the DaskTaskRunner did it dail.
When you say return
them directly, what do you mean? If I have a simple pythong function:
@task
def add(i: int) -> int:
return i + 1
add.submit(i=2)
does that suffice? How does it behave if I .map(i=list(range(20)))
?Nate
09/19/2025, 2:27 PMThedoh! my bad facepalm i look through things too quickly sometimes i copied and pasted your code and ranI attached as a reply to the original messageprefect versions
uv run --extra dask your_script.py
from the prefect repo and did not encounter an error but i'll take a closer look at it to understand what you might be running into
have a great night!Tim Galvin
09/19/2025, 2:30 PMnumbers = list(range(4))
task_lisk = [task.submit(i) for i in numbers]
task_map = task.map(numbers)
summer.submit(task_list) # works
wummder.submit(task_map) # fails
The above skeleton works when I used a @flow
with no task run options, but fails when I @flow(..., task_runner=DaskTaskRunner)
Tim Galvin
09/19/2025, 2:34 PMuv
run comman tomorrow. For full context I have these extra environment variables on my launch script to the HPC I am using, but dont see them as a cause of concer:
export APIURL=http://my.machine.com4200/api
export PREFECT_API_URL="${APIURL}"
export WORKDIR=$(pwd)
export PREFECT_HOME="${WORKDIR}/prefect"
export PREFECT_LOGGING_EXTRA_LOGGERS="tp3"
export PREFECT_LOGGING_LEVEL="INFO"
export PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
#echo "Sourcing home"
#source /home/$(whoami)/.bashrc
ENV="flint_prefect3"
echo "Activating conda flint environment: ${ENV}"
conda activate "${ENV}"
python /scratch3/gal16b/packages/test_prefect3/test_script.pyTim Galvin
09/20/2025, 9:39 AMNate
09/20/2025, 2:07 PMpbpaste | uv run --with 'prefect[dask]' -
(i have a mac, so pbpaste
gives the content of my clipboard)
• --with
can specify any version of prefect with the dask extra installed prefect[dask]==3.x.y
or pass prefect
and prefect-dask
separately if you likeTim Galvin
09/20/2025, 3:05 PMNate
09/20/2025, 3:05 PMthe only real difference 2->3 here is that terminal futures are not automatically resolved
that is, you must either eitherthis is generally the only real difference that should exist in terms of how prefect works from 2 to 3futures directly or callreturn
or.result
on them to resolve them, prefect 3.x will not magically find unresolved futures and resolve them for you like it did in 2.x (caused other problems).wait
Tim Galvin
09/20/2025, 3:10 PMuv run
command as mentioned above and I am getting this again
01:08:16.578 | INFO | tp3 - Sleeping now for 5
01:08:16.627 | ERROR | Task run 'summer Direct i=1-d18' - Task run failed with exception: AttributeError("'NoneType' object has no attribute 'type'")
Traceback (most recent call last):
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 870, in run_context
yield self
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 1507, in run_task_sync
engine.call_task_fn(txn)
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/task_engine.py", line 887, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/prefect/utilities/callables.py", line 210, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/scratch3/gal16b/packages/test_prefect3/test_script.py", line 26, in summer
logger.info(f"The numbers to sum: {numbers=}")
^^^^^^^^^^
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 602, in __repr__
if self.type:
^^^^^^^^^
File "/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py", line 567, in type
return self._state.type
^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'type'
01:08:16.638 | ERROR | Task run 'summer Direct i=1-d18' - Finished in state Failed("Task run encountered an exception AttributeError: 'NoneType' object has no attribute 'type'")
So. I am guessing something in this environment specifically is causing me griefNate
09/20/2025, 3:12 PMdistributed
version that miniconda
is holding onto here
/datasets/work/jones-storage/work/miniconda/miniforge3/envs/flint_prefect3/lib/python3.12/site-packages/distributed/client.py
Tim Galvin
09/20/2025, 3:12 PM(flint_prefect3) gal16b@petrichor-login:/scratch3/gal16b/find_error> dask --version
dask, version 2024.10.0
It is a little outdated - I made this environment up this week to try out prefect3. So, there might be a dependency setting thisNate
09/20/2025, 3:14 PMpbpaste | uv run --with 'prefect[dask]' --with distributed==2024.10.0 -
this reproduces for meTim Galvin
09/20/2025, 3:15 PMNate
09/20/2025, 3:15 PMdistributed
version, i'd try and upgrade that. we might wanna pin above that in prefect-dask
Tim Galvin
09/20/2025, 3:16 PMNate
09/20/2025, 3:19 PMuv
enough 🙂 (i know people have reasons for using other things! but) as someone who has used all the things for all the reasons, i think about inscrutable python environment things a tiny fraction of the time that i used toNate
09/20/2025, 3:21 PMTim Galvin
09/20/2025, 3:21 PMcasacore
and python-casacore
. These are teh pain that keep in the conda sphereNate
09/20/2025, 3:21 PMTim Galvin
09/21/2025, 5:47 AM