Julian Regan
08/15/2024, 5:33 PM@task
def say_hello() -> bool:
print(f"Hello, world")
return True
@task
def say_goodbye() -> str:
raise ValueError("Goodbye")
@task
def see_whats_up(said_hello: bool):
if said_hello:
print(f"What's up?")
Now, from what I've read in the docs on final state determination, mainly:
• If the flow does not return a value (or returns), its state is determined by the states of all of the tasks and nested flows within it.None
◦ If any task run or nested flow run fails, then the final flow run state is marked as.FAILED
◦ If any task run is cancelled, then the final flow run state is marked asI should be able to write a flow containing some basic concurrency like this:.CANCELLED
@flow(log_prints=True)
def test():
said_hello = say_hello.submit()
say_hello.submit()
say_hello.submit()
say_goodbye.submit()
see_whats_up.submit(said_hello=said_hello)
However, this throws multiple warnings:
A future was garbage collected before it resolved. Please calland errors with:or.wait()
on futures to ensure they resolve..result()
Please wait for all submitted tasks to complete before exiting your flow by callingThe simplest way I've found to make this work is to assign and manually return the future from every single task, which feels a bit verbose:on the.wait()
returned from yourPrefectFuture
calls..submit()
@flow(log_prints=True)
def test():
said_hello = say_hello.submit()
said_hello_2 = say_hello.submit()
said_hello_3 = say_hello.submit()
goodbye = say_goodbye.submit()
whats_up = see_whats_up.submit(said_hello=said_hello)
# all futures must be returned for the flow to execute correctly
return (
said_hello,
said_hello_2,
said_hello_3,
goodbye,
whats_up,
)
I suppose I could also replace the return statement with an equally long list of future.wait()
calls, though if anything that's more confusing.
1. Is there a cleaner way of handling concurrency within a flow?
2. Have I missed a big warning / explanation about this in the docs? 🫠Nate
08/15/2024, 5:53 PMI should be able to write a flow containing some basic concurrency like this:
```@flow(log_prints=True)
def test():
said_hello = say_hello.submit()
say_hello.submit()
say_hello.submit()
say_goodbye.submit()
see_whats_up.submit(said_hello=said_hello)```
However, this throws multiple warnings:
> A future was garbage collected before it resolved. Please callor.wait()
on futures to ensure they resolve..result()
and errors with:
> Please wait for all submitted tasks to complete before exiting your flow by callingin other frameworks like dask, you also need to wait for your futures, or else the program can end before your work is definitely completeon the.wait()
returned from yourPrefectFuture
calls..submit()
import time
from dask.distributed import Client
client = Client()
def say_hello():
time.sleep(2) # Simulate a delay
print("Task completed.")
return "Hello"
future = client.submit(say_hello)
print("Exiting program without waiting for the task...")
later, you'd see the Task completed.
come through, and in the case of prefect that would mean that work is happening outside the context of a flow, which is probably not what you want - hence the warning we log if you dont wait for them / return them
does this help?
i'll add that your first 3 invocations can be written something like this (ie using .map
)
In [1]: from prefect import flow, task
In [2]: @task
...: def say(message: str) -> int:
...: print(message)
...: return len(message)
...:
In [3]: @flow(log_prints=True)
...: def test():
...: assert say.map(["foo", "bar", "baz"]).result() == [3] * 3
these docs may be helpfulJulian Regan
08/16/2024, 9:06 AMfuture.wait()
calls.
However, in a complex branching flow, I can definitely see scenarios where some branches finish and the main flow continues along another route. As I understand it, calling future.wait()
or future.result()
inline will block flow execution and prevent new tasks from being submitted until that task completes.
What's the recommended approach here? Collect a list of all orphaned futures and wait for them at the end of the flow?
As a side note, it might be useful to add some notes around this behaviour (i.e. all futures must be consumed) to the v3 task runner docs