<@ULVA73B9P> I have a Flow that writes data to a d...
# marvin-in-the-wild
w
@Marvin I have a Flow that writes data to a database. How would I test this using
prefect_test_harness
?
m
Brilliant. You want to test a data-writing flow using Prefect's testing tools. First, which version of Prefect are you using, 2.x or 3.x?
w
@Marvin 2.x
m
Alright, you can use
prefect_test_harness
to test your flow that writes data to a database. The harness sets up a temporary local SQLite database which simplifies testing. Here's the basic idea: 1. Import
prefect_test_harness
from
prefect.testing.utilities
. 2. Wrap your tests with this context manager to run against the temporary database. 3. If you're using a testing framework like
pytest
, you can set up a fixture using
prefect_test_harness
to scope the temporary database for your test session. For example:
Copy code
python
from prefect import flow
from prefect.testing.utilities import prefect_test_harness

@flow
def write_to_db_flow():
    # your flow logic to write to the database
    pass

def test_write_to_db_flow():
    with prefect_test_harness():
        # run the flow against a temporary testing database
        write_to_db_flow()
        # add your assertions and checks here
Using `pytest`:
Copy code
python
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness

@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield

@flow
def write_to_db_flow():
    # your flow logic to write to the database
    pass

def test_write_to_db_flow():
    write_to_db_flow()
    # add your assertions and checks here
For more detailed information, you can refer to the Prefect Testing Guide. There, I've laid it all out for you. Enjoy the delightful process of testing!
w
@Marvin great, now how do I assert the data was written to this temporary database?
m
Naturally, you want to know if your data was actually written to the temporary database. It's almost like you don't trust your flow. Here’s how you can do that: 1. Within the same context where
prefect_test_harness
applies, connect to the temporary SQLite database. 2. Perform SQL queries to verify the expected data is present. Here's a complete example:
Copy code
python
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
import sqlite3

@flow
def write_to_db_flow():
    connection = sqlite3.connect("file::memory:?cache=shared")
    cursor = connection.cursor()
    cursor.execute("CREATE TABLE test (value TEXT)")
    cursor.execute("INSERT INTO test (value) VALUES ('hello world')")
    connection.commit()
    connection.close()

def test_write_to_db_flow():
    with prefect_test_harness():
        # Run the flow
        write_to_db_flow()

        # Connect to the same temporary SQLite database
        connection = sqlite3.connect("file::memory:?cache=shared")
        cursor = connection.cursor()

        # Query the database to check if the data was written
        cursor.execute("SELECT value FROM test")
        result = cursor.fetchone()

        # Assert the expected data is there
        assert result == ('hello world',), f"Expected data not found in database, got {result}"
        connection.close()
Using `pytest`:
Copy code
python
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness
import sqlite3

@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield

@flow
def write_to_db_flow():
    connection = sqlite3.connect("file::memory:?cache=shared")
    cursor = connection.cursor()
    cursor.execute("CREATE TABLE test (value TEXT)")
    cursor.execute("INSERT INTO test (value) VALUES ('hello world')")
    connection.commit()
    connection.close()

def test_write_to_db_flow():
    # Run the flow
    write_to_db_flow()

    # Connect to the same temporary SQLite database
    connection = sqlite3.connect("file::memory:?cache=shared")
    cursor = connection.cursor()

    # Query the database to check if the data was written
    cursor.execute("SELECT value FROM test")
    result = cursor.fetchone()

    # Assert the expected data is there
    assert result == ('hello world',), f"Expected data not found in database, got {result}"
    connection.close()
Essentially, after running your flow, you connect to the temporary database and perform assertions like you would normally do with any database. Again, for more on Prefect's testing, you can check out the Prefect Testing Guide. There, now you can be absolutely sure your data got written. Enjoy the verification!
w
@Marvin how do I mock tasks?
m
So, you want to mock your tasks for testing purposes? Pretty sure nobody's ever asked that before. Here’s how you can mock tasks in Prefect 2.x: 1. Access the original function of your tasks: Tasks in Prefect have a
.fn
attribute that lets you access the original function. 2. Disable the logger if required: If your task uses a logger, you might want to disable it to avoid
RuntimeError
raised from a missing flow context. ### Example: Here's how you can mock tasks using `unittest.mock`:
Copy code
python
from unittest.mock import patch
from prefect import flow, task

@task
def my_database_task():
    # Task that writes to the database
    pass

@flow
def my_flow():
    my_database_task()

def test_my_database_task():
    with patch('path.to.my_database_task.fn') as mock_task:
        mock_task.return_value = "Mocked result"
        
        my_flow()
        
        mock_task.assert_called_once()
If you're using
pytest
, it would look similar but could involve additional fixtures. ### Using Prefect's Task
.fn
for Unit Testing: You can directly call the original function behind a task to test its functionality without the Prefect engine running the task:
Copy code
python
from prefect import flow, task

@task
def my_favorite_task():
    return 42

@flow
def my_favorite_flow():
    val = my_favorite_task()
    return val

def test_my_favorite_task():
    assert my_favorite_task.fn() == 42

# If your task uses a logger
from prefect.logging import disable_run_logger

def test_my_favorite_task_with_logger():
    with disable_run_logger():
        assert my_favorite_task.fn() == 42
For more detailed information, you can check out the Prefect Testing Guide. There you go, absolutely everything necessary to mock and test those tasks. You're welcome!