Hi all - I have a set of perfect version 2 workflo...
# ask-community
t
Hi all - I have a set of perfect version 2 workflows that I am trying to get to work under the version 3 engine. I am using a DaskTaskRunner., and have very simply
@task
decorated functions without any asyncio. I am seeing that all the tasks are now returning
prefect_dask.task_runners.PrefectDaskFuture
. Oddly they are not being used to create any tasks, or general workflow. Each time I run I am not seeing any dask tasks get registered in the web UI. What is confusing my is that I am seeing the logs in the version 3 web UI. Is there now some differencce in how a dask task running is meant to be used? Is it automatically started and in a state ready to receive work? Or am I meant to be resolving these futures explicitly before they are added to the dask work graph?
a
Hey @Tim Galvin! There could be a couple reasons you aren't seeing tasks. 1. It might be an issue with the connection between your Dask instance and the Prefect server. For Prefect 3, you'll need to be able to establish a websocket connection from your Dask workers to your Prefect server to allow task state updates to be streamed to Prefect. You can check the logs on your Dask workers to see if that might be the case. 2. Your server might be missing a background service that records task run state changes. If you share how you set up your Prefect server we can either confirm or rule out if this is the issue.
t
Easy - I will have a look shortly. It almost my bedd time here. I did, as a test, add a
.result()
to the outputs of one of my
.submit()
calls. It seems like in that case I did get the tasks up to that point registered on the UI, and they mostly ran. Is there an expectation now of issuing a
.compute()
like call like there is in dask land?
On point 1 I will need to double check. I think that the workers on the HPC compute nodes can communicate with this particular vm. By weksocket do you mean able to hit some RESTful api endpoint, e.g. talk toa webserver? I'll examine the logs of the dask workers tomorrow morning
On point two I am starting my prefect server like this
Copy code
source ~/.bashrc
    conda activate prefect3
    # prefect server start  --host 0.0.0.0
    python -m uvicorn \
        --app-dir /tones-work/work/miniconda/miniconda3/envs/prefect3/lib/python3.13/site-packages \
        --factory prefect.server.api.server:create_app \
        --host 0.0.0.0 \
        --port 4200 \
        --timeout-keep-alive 10 \
        --limit-max-requests 4096 \
        --timeout-graceful-shutdown 7200
I am kicking off the uvicorn directly so I can set some other additional settings that were not exposed. From my last reading of this in prefect 2 times under the hood this was effectively what was going on
a
Ah, ok, if a
.result()
call causes the tasks to show up, then it's possible the non-blocking nature of
.submit()
was making it so that the flow finished before tasks got the chance to start up. You'll need to wait for the terminal submitted tasks to complete before exiting your flow. That was one of the changes we made in Prefect 3,
prefect
won't automatically wait for all tasks to complete before exiting a flow.
t
Ahh
is there a helper function / context to do this magically for us / or help out?
a
You can use
wait
from
prefect.futures
to wait for a group of futures to complete
t
For instance, do the results need to be a flat list, or could a list of list of futures and iit will recursively go looking for
.results()
?
Ahh ok. Is that backwards compatible with prefectv2 by chance?
Now I can actually sleep easy tonight - thanks for this!
a
No,
wait
was added in Prefect 3 and it needs to be a flat list. Futures that are passed into tasks/flows and returned from task/flows are automatically resolved, so that should help with maintaining a flat list of futures.
t
Ok, this gives me something to work with
Thanks, I will give it a whirl tomorrow
Thanks for that
Hi @alex - A question for you. Would you expect prefect to behave different depending on the the specified task runner that is being used? That is to say, would you have to treat the `PrefectFuture`'s being exchanged between tasks differently? I ask because the following MWE is starting to give me this feeling.
Copy code
"""An attempt to understand how prefect3 works"""

from prefect import task, flow, __version__
from prefect.futures import PrefectFuture, wait

from prefect_dask import DaskTaskRunner


petrichor = {
    'cluster_class': "dask_jobqueue.SLURMCluster",
    'cluster_kwargs': {
    'cores': 1,
    'processes': 1,
    'job_cpu': 2,
    'name': 'flint-worker',
    'memory': "5GB",
    'walltime': '0-24:00:00',
    'job_extra_directives': ['--no-requeue'],
    # interface for the workers
    'interface': "ib0",
    'log_directory': 'flint_logs',
    'job_script_prologue': [
         'module load apptainer/1.3.3',
         'unset APPTAINER_BINDPATH',
         'unset SINGULARITY_BINDPATH',
         "export OMP_NUM_THREADS=4"],
    #local_directory: $LOCALDIR
    'silence_logs': 'info'
},
'adapt_kwargs': {
    'minimum': 1,
    'maximum': 37}
}

def flatten(values):
    flatten_list = []
    for value in values:
        if isinstance(value, list):
            flatten_list.extend(value)
            continue
        
        flatten_list.append(value)
            
    return flatten_list

def waiter(**kwargs) -> None:
    # Wait for everything that looks like a prefect future
    to_wait = flatten(kwargs.values())
    to_wait = [var for var in to_wait if isinstance(var, PrefectFuture)]
    
    print(f"Waiting for {len(to_wait)} prefect futures")
    wait(to_wait)

 
@task(log_prints=True)
def adder(a: int) -> int:
    print(f"Value of {a=}")
    return a + 1


def summer(numbers: list[int], another_thing: bool) -> int:
    print(f"The numbers to sum: {numbers=}")
    for number in numbers:
        print(number)
    return sum(numbers)

# I have this usage in my module whenever possible
task_summer = task(summer, log_prints=True)

@flow(log_prints=True)
def the_flow() -> None:
    numbers = list(range(4))
    print(f"The numbers as {numbers=}")
    numbers = adder.map(numbers)
    
    print(numbers)
    summed = task_summer.submit(numbers, another_thing=True)
    
    numbers = adder.map(numbers)
    
    waiter(**locals().copy())
    

def main() -> None:
    # This works
    the_flow.with_options(name="My Flow with default")()
    
    # This does not work
    the_flow.with_options(task_runner=DaskTaskRunner(), name="My Flow - DaskTaskRunner Default")()
    
    # This does not work
    the_flow.with_options(
        task_runner=DaskTaskRunner(**petrichor), name="My flow - DaskTaskRunner SlurmCluster"
    )()

if __name__ == "__main__":
    main()
    # print(f"{runner(a=10)=}")
In the base case with
My flow with default
all tasks run without error and I get all logs I expect from all tasks. When
My Flow - DaskTaskRunner Default
runs the
summer
task fails with logged output (see attached screen cap) When
My Flow - DaskTaskRunner SlurmCluster
runs the
summer
task fails, but I get no output in the UI (see secibd screen cap) In prefect version 2 the introspection the engine was doing was abolve to resolve this type of many-to-one relationship. So, I am not sure if I am doing something grossly wrong, if I can getting caught up in how transactions/cached task results are used. For this MWE I put all flows in the one script, and in typing it out I only just realised that in of itself may be confusing the results ...