Hi all, just exploring Prefect v3 and have a (hopefully) simple question that I'm struggling to find...
j

Julian Regan

9 months ago
Hi all, just exploring Prefect v3 and have a (hopefully) simple question that I'm struggling to find a clear answer to. I've created some very basic tasks:
@task
def say_hello() -> bool:
    print(f"Hello, world")
    return True

@task
def say_goodbye() -> str:
    raise ValueError("Goodbye")

@task
def see_whats_up(said_hello: bool):
    if said_hello:
        print(f"What's up?")
Now, from what I've read in the docs on final state determination, mainly:
• If the flow does not return a value (or returns
None
), its state is determined by the states of all of the tasks and nested flows within it.
◦ If any task run or nested flow run fails, then the final flow run state is marked as
FAILED
.
◦ If any task run is cancelled, then the final flow run state is marked as
CANCELLED
.
I should be able to write a flow containing some basic concurrency like this:
@flow(log_prints=True)
def test():
    said_hello = say_hello.submit()
    say_hello.submit()
    say_hello.submit()

    say_goodbye.submit()

    see_whats_up.submit(said_hello=said_hello)
However, this throws multiple warnings:
A future was garbage collected before it resolved. Please call
.wait()
or
.result()
on futures to ensure they resolve.
and errors with:
Please wait for all submitted tasks to complete before exiting your flow by calling
.wait()
on the
PrefectFuture
returned from your
.submit()
calls.
The simplest way I've found to make this work is to assign and manually return the future from every single task, which feels a bit verbose:
@flow(log_prints=True)
def test():
    said_hello = say_hello.submit()
    said_hello_2 = say_hello.submit()
    said_hello_3 = say_hello.submit()

    goodbye = say_goodbye.submit()

    whats_up = see_whats_up.submit(said_hello=said_hello)

    # all futures must be returned for the flow to execute correctly
    return (
        said_hello,
        said_hello_2,
        said_hello_3,
        goodbye,
        whats_up,
    )
I suppose I could also replace the return statement with an equally long list of
future.wait()
calls, though if anything that's more confusing. 1. Is there a cleaner way of handling concurrency within a flow? 2. Have I missed a big warning / explanation about this in the docs? 🫠