Hi all, just exploring Prefect v3 and have a (hope...
# ask-community
j
Hi all, just exploring Prefect v3 and have a (hopefully) simple question that I'm struggling to find a clear answer to. I've created some very basic tasks:
Copy code
@task
def say_hello() -> bool:
    print(f"Hello, world")
    return True

@task
def say_goodbye() -> str:
    raise ValueError("Goodbye")

@task
def see_whats_up(said_hello: bool):
    if said_hello:
        print(f"What's up?")
Now, from what I've read in the docs on final state determination, mainly:
• If the flow does not return a value (or returns
None
), its state is determined by the states of all of the tasks and nested flows within it.
◦ If any task run or nested flow run fails, then the final flow run state is marked as
FAILED
.
◦ If any task run is cancelled, then the final flow run state is marked as
CANCELLED
.
I should be able to write a flow containing some basic concurrency like this:
Copy code
@flow(log_prints=True)
def test():
    said_hello = say_hello.submit()
    say_hello.submit()
    say_hello.submit()

    say_goodbye.submit()

    see_whats_up.submit(said_hello=said_hello)
However, this throws multiple warnings:
A future was garbage collected before it resolved. Please call
.wait()
or
.result()
on futures to ensure they resolve.
and errors with:
Please wait for all submitted tasks to complete before exiting your flow by calling
.wait()
on the
PrefectFuture
returned from your
.submit()
calls.
The simplest way I've found to make this work is to assign and manually return the future from every single task, which feels a bit verbose:
Copy code
@flow(log_prints=True)
def test():
    said_hello = say_hello.submit()
    said_hello_2 = say_hello.submit()
    said_hello_3 = say_hello.submit()

    goodbye = say_goodbye.submit()

    whats_up = see_whats_up.submit(said_hello=said_hello)

    # all futures must be returned for the flow to execute correctly
    return (
        said_hello,
        said_hello_2,
        said_hello_3,
        goodbye,
        whats_up,
    )
I suppose I could also replace the return statement with an equally long list of
future.wait()
calls, though if anything that's more confusing. 1. Is there a cleaner way of handling concurrency within a flow? 2. Have I missed a big warning / explanation about this in the docs? 🫠
n
hi @Julian Regan this was an intentional design change from prefect 2 to prefect 3
I should be able to write a flow containing some basic concurrency like this:
```@flow(log_prints=True)
def test():
said_hello = say_hello.submit()
say_hello.submit()
say_hello.submit()
say_goodbye.submit()
see_whats_up.submit(said_hello=said_hello)```
However, this throws multiple warnings:
> A future was garbage collected before it resolved. Please call
.wait()
or
.result()
on futures to ensure they resolve.
and errors with:
> Please wait for all submitted tasks to complete before exiting your flow by calling
.wait()
on the
PrefectFuture
returned from your
.submit()
calls.
in other frameworks like dask, you also need to wait for your futures, or else the program can end before your work is definitely complete
Copy code
import time
from dask.distributed import Client

client = Client()

def say_hello():
    time.sleep(2)  # Simulate a delay
    print("Task completed.")
    return "Hello"

future = client.submit(say_hello)

print("Exiting program without waiting for the task...")
later, you'd see the
Task completed.
come through, and in the case of prefect that would mean that work is happening outside the context of a flow, which is probably not what you want - hence the warning we log if you dont wait for them / return them does this help? i'll add that your first 3 invocations can be written something like this (ie using
.map
)
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def say(message: str) -> int:
   ...:     print(message)
   ...:     return len(message)
   ...:

In [3]: @flow(log_prints=True)
   ...: def test():
   ...:     assert say.map(["foo", "bar", "baz"]).result() == [3] * 3
these docs may be helpful
j
Hi Nate, thanks for the info. I suppose in a real-world example, most tasks will use the output of a previous task which reduces the need for explicit
future.wait()
calls. However, in a complex branching flow, I can definitely see scenarios where some branches finish and the main flow continues along another route. As I understand it, calling
future.wait()
or
future.result()
inline will block flow execution and prevent new tasks from being submitted until that task completes. What's the recommended approach here? Collect a list of all orphaned futures and wait for them at the end of the flow? As a side note, it might be useful to add some notes around this behaviour (i.e. all futures must be consumed) to the v3 task runner docs
339 Views