William Hom
09/03/2024, 7:07 PMprefect_test_harness
?Marvin
09/03/2024, 7:07 PMWilliam Hom
09/03/2024, 7:08 PMMarvin
09/03/2024, 7:08 PMprefect_test_harness
to test your flow that writes data to a database. The harness sets up a temporary local SQLite database which simplifies testing.
Here's the basic idea:
1. Import prefect_test_harness
from prefect.testing.utilities
.
2. Wrap your tests with this context manager to run against the temporary database.
3. If you're using a testing framework like pytest
, you can set up a fixture using prefect_test_harness
to scope the temporary database for your test session.
For example:
python
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
@flow
def write_to_db_flow():
# your flow logic to write to the database
pass
def test_write_to_db_flow():
with prefect_test_harness():
# run the flow against a temporary testing database
write_to_db_flow()
# add your assertions and checks here
Using `pytest`:
python
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@flow
def write_to_db_flow():
# your flow logic to write to the database
pass
def test_write_to_db_flow():
write_to_db_flow()
# add your assertions and checks here
For more detailed information, you can refer to the Prefect Testing Guide.
There, I've laid it all out for you. Enjoy the delightful process of testing!William Hom
09/03/2024, 7:11 PMMarvin
09/03/2024, 7:12 PMprefect_test_harness
applies, connect to the temporary SQLite database.
2. Perform SQL queries to verify the expected data is present.
Here's a complete example:
python
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
import sqlite3
@flow
def write_to_db_flow():
connection = sqlite3.connect("file::memory:?cache=shared")
cursor = connection.cursor()
cursor.execute("CREATE TABLE test (value TEXT)")
cursor.execute("INSERT INTO test (value) VALUES ('hello world')")
connection.commit()
connection.close()
def test_write_to_db_flow():
with prefect_test_harness():
# Run the flow
write_to_db_flow()
# Connect to the same temporary SQLite database
connection = sqlite3.connect("file::memory:?cache=shared")
cursor = connection.cursor()
# Query the database to check if the data was written
cursor.execute("SELECT value FROM test")
result = cursor.fetchone()
# Assert the expected data is there
assert result == ('hello world',), f"Expected data not found in database, got {result}"
connection.close()
Using `pytest`:
python
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness
import sqlite3
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@flow
def write_to_db_flow():
connection = sqlite3.connect("file::memory:?cache=shared")
cursor = connection.cursor()
cursor.execute("CREATE TABLE test (value TEXT)")
cursor.execute("INSERT INTO test (value) VALUES ('hello world')")
connection.commit()
connection.close()
def test_write_to_db_flow():
# Run the flow
write_to_db_flow()
# Connect to the same temporary SQLite database
connection = sqlite3.connect("file::memory:?cache=shared")
cursor = connection.cursor()
# Query the database to check if the data was written
cursor.execute("SELECT value FROM test")
result = cursor.fetchone()
# Assert the expected data is there
assert result == ('hello world',), f"Expected data not found in database, got {result}"
connection.close()
Essentially, after running your flow, you connect to the temporary database and perform assertions like you would normally do with any database.
Again, for more on Prefect's testing, you can check out the Prefect Testing Guide.
There, now you can be absolutely sure your data got written. Enjoy the verification!William Hom
09/03/2024, 7:48 PMMarvin
09/03/2024, 7:48 PM.fn
attribute that lets you access the original function.
2. Disable the logger if required: If your task uses a logger, you might want to disable it to avoid RuntimeError
raised from a missing flow context.
### Example:
Here's how you can mock tasks using `unittest.mock`:
python
from unittest.mock import patch
from prefect import flow, task
@task
def my_database_task():
# Task that writes to the database
pass
@flow
def my_flow():
my_database_task()
def test_my_database_task():
with patch('path.to.my_database_task.fn') as mock_task:
mock_task.return_value = "Mocked result"
my_flow()
mock_task.assert_called_once()
If you're using pytest
, it would look similar but could involve additional fixtures.
### Using Prefect's Task .fn
for Unit Testing:
You can directly call the original function behind a task to test its functionality without the Prefect engine running the task:
python
from prefect import flow, task
@task
def my_favorite_task():
return 42
@flow
def my_favorite_flow():
val = my_favorite_task()
return val
def test_my_favorite_task():
assert my_favorite_task.fn() == 42
# If your task uses a logger
from prefect.logging import disable_run_logger
def test_my_favorite_task_with_logger():
with disable_run_logger():
assert my_favorite_task.fn() == 42
For more detailed information, you can check out the Prefect Testing Guide.
There you go, absolutely everything necessary to mock and test those tasks. You're welcome!