Tim Galvin
09/15/2025, 1:48 PM@task
decorated functions without any asyncio.
I am seeing that all the tasks are now returning prefect_dask.task_runners.PrefectDaskFuture
. Oddly they are not being used to create any tasks, or general workflow. Each time I run I am not seeing any dask tasks get registered in the web UI. What is confusing my is that I am seeing the logs in the version 3 web UI.
Is there now some differencce in how a dask task running is meant to be used? Is it automatically started and in a state ready to receive work? Or am I meant to be resolving these futures explicitly before they are added to the dask work graph?alex
09/15/2025, 1:52 PMTim Galvin
09/15/2025, 2:59 PM.result()
to the outputs of one of my .submit()
calls. It seems like in that case I did get the tasks up to that point registered on the UI, and they mostly ran.
Is there an expectation now of issuing a .compute()
like call like there is in dask land?Tim Galvin
09/15/2025, 3:00 PMTim Galvin
09/15/2025, 3:02 PMsource ~/.bashrc
conda activate prefect3
# prefect server start --host 0.0.0.0
python -m uvicorn \
--app-dir /tones-work/work/miniconda/miniconda3/envs/prefect3/lib/python3.13/site-packages \
--factory prefect.server.api.server:create_app \
--host 0.0.0.0 \
--port 4200 \
--timeout-keep-alive 10 \
--limit-max-requests 4096 \
--timeout-graceful-shutdown 7200
I am kicking off the uvicorn directly so I can set some other additional settings that were not exposed. From my last reading of this in prefect 2 times under the hood this was effectively what was going onalex
09/15/2025, 3:06 PM.result()
call causes the tasks to show up, then it's possible the non-blocking nature of .submit()
was making it so that the flow finished before tasks got the chance to start up. You'll need to wait for the terminal submitted tasks to complete before exiting your flow. That was one of the changes we made in Prefect 3, prefect
won't automatically wait for all tasks to complete before exiting a flow.Tim Galvin
09/15/2025, 3:07 PMTim Galvin
09/15/2025, 3:07 PMalex
09/15/2025, 3:08 PMwait
from prefect.futures
to wait for a group of futures to completeTim Galvin
09/15/2025, 3:09 PM.results()
?Tim Galvin
09/15/2025, 3:09 PMTim Galvin
09/15/2025, 3:10 PMalex
09/15/2025, 3:11 PMwait
was added in Prefect 3 and it needs to be a flat list. Futures that are passed into tasks/flows and returned from task/flows are automatically resolved, so that should help with maintaining a flat list of futures.Tim Galvin
09/15/2025, 3:12 PMTim Galvin
09/15/2025, 3:12 PMTim Galvin
09/15/2025, 3:12 PMTim Galvin
09/16/2025, 3:03 PM"""An attempt to understand how prefect3 works"""
from prefect import task, flow, __version__
from prefect.futures import PrefectFuture, wait
from prefect_dask import DaskTaskRunner
petrichor = {
'cluster_class': "dask_jobqueue.SLURMCluster",
'cluster_kwargs': {
'cores': 1,
'processes': 1,
'job_cpu': 2,
'name': 'flint-worker',
'memory': "5GB",
'walltime': '0-24:00:00',
'job_extra_directives': ['--no-requeue'],
# interface for the workers
'interface': "ib0",
'log_directory': 'flint_logs',
'job_script_prologue': [
'module load apptainer/1.3.3',
'unset APPTAINER_BINDPATH',
'unset SINGULARITY_BINDPATH',
"export OMP_NUM_THREADS=4"],
#local_directory: $LOCALDIR
'silence_logs': 'info'
},
'adapt_kwargs': {
'minimum': 1,
'maximum': 37}
}
def flatten(values):
flatten_list = []
for value in values:
if isinstance(value, list):
flatten_list.extend(value)
continue
flatten_list.append(value)
return flatten_list
def waiter(**kwargs) -> None:
# Wait for everything that looks like a prefect future
to_wait = flatten(kwargs.values())
to_wait = [var for var in to_wait if isinstance(var, PrefectFuture)]
print(f"Waiting for {len(to_wait)} prefect futures")
wait(to_wait)
@task(log_prints=True)
def adder(a: int) -> int:
print(f"Value of {a=}")
return a + 1
def summer(numbers: list[int], another_thing: bool) -> int:
print(f"The numbers to sum: {numbers=}")
for number in numbers:
print(number)
return sum(numbers)
# I have this usage in my module whenever possible
task_summer = task(summer, log_prints=True)
@flow(log_prints=True)
def the_flow() -> None:
numbers = list(range(4))
print(f"The numbers as {numbers=}")
numbers = adder.map(numbers)
print(numbers)
summed = task_summer.submit(numbers, another_thing=True)
numbers = adder.map(numbers)
waiter(**locals().copy())
def main() -> None:
# This works
the_flow.with_options(name="My Flow with default")()
# This does not work
the_flow.with_options(task_runner=DaskTaskRunner(), name="My Flow - DaskTaskRunner Default")()
# This does not work
the_flow.with_options(
task_runner=DaskTaskRunner(**petrichor), name="My flow - DaskTaskRunner SlurmCluster"
)()
if __name__ == "__main__":
main()
# print(f"{runner(a=10)=}")
In the base case with My flow with default
all tasks run without error and I get all logs I expect from all tasks.
When My Flow - DaskTaskRunner Default
runs the summer
task fails with logged output (see attached screen cap)
When My Flow - DaskTaskRunner SlurmCluster
runs the summer
task fails, but I get no output in the UI (see secibd screen cap)
In prefect version 2 the introspection the engine was doing was abolve to resolve this type of many-to-one relationship. So, I am not sure if I am doing something grossly wrong, if I can getting caught up in how transactions/cached task results are used. For this MWE I put all flows in the one script, and in typing it out I only just realised that in of itself may be confusing the results ...