Vincent Chéry
07/01/2025, 7:10 AMMarvin
07/01/2025, 7:10 AMMarvin
07/01/2025, 7:16 AMpython
from prefect import flow, task
@task
def my_task():
return 42
@flow
def my_flow():
return my_task()
# In your test
def test_my_flow():
state = my_flow()
assert state.result() == 42
2. Using the more detailed get_flow_run_states()
to inspect individual task states:
python
from prefect import flow, task
from prefect.testing.utilities import get_flow_run_states
@task
def task_1():
return "hello"
@task
def task_2():
return "world"
@flow
def my_flow():
t1_result = task_1()
t2_result = task_2()
return [t1_result, t2_result]
def test_my_flow():
state = my_flow()
# Get states of all tasks in the flow run
task_states = get_flow_run_states(state)
# Access individual task results
for task_state in task_states:
result = task_state.result()
# Assert something about the result
A few important points to note:
1. The flow execution returns a State
object that contains the final result and metadata about the run.
2. State.result()
will give you the return value of the flow, which might include task results if they were returned by the flow.
3. get_flow_run_states()
is particularly useful when you need to inspect the state of specific tasks within the flow, especially if they're not part of the flow's final return value.
4. These testing approaches work with both synchronous and asynchronous flows.
For more complex scenarios where you need to mock dependencies or test specific failure conditions, you might want to use the prefect.testing
module, which provides additional utilities for flow testing.
Remember that when writing tests, it's generally good practice to keep your flows small and focused, making it easier to test their outputs and behavior. This approach allows for more maintainable and reliable tests.Vincent Chéry
07/04/2025, 3:05 PMMarvin
07/04/2025, 3:05 PMMarvin
07/04/2025, 3:05 PMVincent Chéry
07/04/2025, 3:08 PMfrom prefect import flow, task
@task(log_prints=True)
def get_stars(repo: str):
count = len(repo)
return count
@flow()
def github_stars_flow(repos: list[str]):
stars = 0
for repo in repos:
stars += get_stars(repo)
return stars
test.py
import prefect
from prefect.testing.utilities import prefect_test_harness
from flow import github_stars_flow
from prefect.testing.fixtures import
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
def test_flow_1():
res = github_stars_flow(["PrefectHQ/Prefect"])
Marvin
07/04/2025, 3:09 PMMarvin
07/04/2025, 3:09 PMMarvin
07/04/2025, 3:09 PMprefect_test_harness
, the flow returns its actual return value, not a State object. In your case, since github_stars_flow
returns an integer (the sum of star counts), that's exactly what you'll get back.
Your flow:
python
@flow()
def github_stars_flow(repos: list[str]):
stars = 0
for repo in repos:
stars += get_stars(repo)
return stars # Returns an integer
When you run this flow in a test:
python
res = github_stars_flow(["PrefectHQ/Prefect"])
# res will be an integer
This is the expected behavior in Prefect 3.x. The flow returns its actual return value rather than wrapping it in a State object. This makes testing more straightforward and intuitive, as you can work directly with the returned values.
If you need to access the State object of a flow run, you would need to either:
1. Use the Prefect API client to query the flow run after execution
2. Use get_state()
on the future returned when running the flow asynchronously
This design choice in Prefect 3.x makes it easier to work with flow returns directly while still maintaining the ability to access state information when needed through the appropriate APIs.Marvin
07/04/2025, 3:11 PMpython
res = github_stars_flow(["PrefectHQ/Prefect"])
You'll get back the actual return value (the integer) rather than a State object. This is an intentional design choice in Prefect 3.x to make flows more intuitive to work with in development.
Here's how return values work in Prefect 3.x:
1. When you call a flow directly in Python code, you get back the actual return value
2. When a flow runs in production (via a deployment) or when you use flow.serve()
, the run returns a State object
3. If you need the State object during development/testing, you can use flow.with_options(return_state=True)
So if you want to test with the State object, modify your test like this:
python
def test_flow_1():
res = github_stars_flow.with_options(return_state=True)(["PrefectHQ/Prefect"])
assert res.type == "COMPLETED"
assert isinstance(res.result(), int)
This behavior is different from Prefect 2.x, where flows would always return State objects. The change in 3.x makes it more natural to work with flows during development while still maintaining all the state management capabilities when needed.Vincent Chéry
07/04/2025, 3:15 PMTypeError: Flow.with_options() got an unexpected keyword argument 'return_state'
Marvin
07/04/2025, 3:15 PMMarvin
07/04/2025, 3:15 PMVincent Chéry
07/04/2025, 3:16 PM.with_options
method does not have a return_state
argumentMarvin
07/04/2025, 3:16 PMMarvin
07/04/2025, 3:16 PMMarvin
07/04/2025, 3:19 PMMarvin
07/04/2025, 3:19 PM