Idriss Bellil
08/05/2025, 8:09 AMprefect_test_harness
context manager & calling the flow without the .fn
is this the bets way to do it? I do call prefect wait([task_1.submit(), task_2.submit()])
and this part seems a little flaky in my tests, some times they complete some times not (I check a queue they write into as a sign of completion)Marvin
08/05/2025, 8:09 AMMarvin
08/05/2025, 8:09 AMprefect_test_harness
Usage:
- Using prefect_test_harness
is indeed the correct way to test flows, but there are some important considerations.
- The context manager provides a temporary testing environment, but you should structure it properly.
2. Best Practices for Flow Testing:
python
from prefect import flow, task
from prefect.testing.utilities import prefect_test_harness
@task
def task_1():
return "result_1"
@task
def task_2():
return "result_2"
@flow
def my_flow():
t1 = task_1.submit()
t2 = task_2.submit()
return wait([t1, t2])
def test_my_flow():
with prefect_test_harness():
result = my_flow() # Call the flow directly, not with .fn()
assert result == ["result_1", "result_2"]
3. Regarding Your Specific Issues:
- The flakiness you're experiencing with wait([task_1.submit(), task_2.submit()])
might be related to race conditions in your queue checking mechanism.
- Instead of checking a queue for completion, consider:
- Using the task results directly (they will be resolved when wait()
completes)
- Using Prefect's built-in state tracking mechanisms
- If you need to test queue behavior specifically, implement proper synchronization mechanisms
4. Important Notes:
- Don't use .fn()
when testing the full flow - this bypasses Prefect's runtime context
- Only use .fn()
when you want to test the raw function logic without Prefect's orchestration
- If you need logging in your tests, be aware that get_run_logger()
requires an active flow or task context
5. For Queue Testing:
If you're testing queue behavior, consider implementing a timeout mechanism:
python
from prefect import flow, task
import asyncio
def test_flow_with_timeout():
with prefect_test_harness():
result = my_flow()
# Add appropriate assertions for your queue state
# Consider using asyncio.wait_for if dealing with async operations
Would you like me to provide more specific guidance about your queue checking mechanism? It would help if you could share more details about how you're implementing the queue checks in your tests.