Solomon Negusse
07/04/2025, 7:08 AMfrom pipelines.disturbance.gadm import load_zarr, clean_data, do_something_special
@task
def task_1():
load_zarr()
@task
def task_2():
clean_data()
@task
def task_3();
do_something_special()
@flow
def gadm_dist_child_flow():
task_1()
task_2()
@flow
def gadm_dist_by_natural_lands_child_flow():
task_1()
task_2()
task_3()
Marvin
07/04/2025, 7:09 AMMarvin
07/04/2025, 7:09 AMpython
import pytest
from unittest.mock import patch
from prefect.testing.utilities import prefect_test_harness
# Mock the imported functions
@pytest.fixture(autouse=True)
def mock_dependencies():
with patch('pipelines.disturbance.gadm.load_zarr') as mock_load, \
patch('pipelines.disturbance.gadm.clean_data') as mock_clean, \
patch('pipelines.disturbance.gadm.do_something_special') as mock_special:
yield {
'load_zarr': mock_load,
'clean_data': mock_clean,
'do_something_special': mock_special
}
def test_task_1(mock_dependencies):
task_1()
mock_dependencies['load_zarr'].assert_called_once()
def test_task_2(mock_dependencies):
task_2()
mock_dependencies['clean_data'].assert_called_once()
def test_task_3(mock_dependencies):
task_3()
mock_dependencies['do_something_special'].assert_called_once()
2. Testing Flow Execution
python
def test_gadm_dist_child_flow(mock_dependencies):
gadm_dist_child_flow()
# Verify the correct tasks were called in order
mock_dependencies['load_zarr'].assert_called_once()
mock_dependencies['clean_data'].assert_called_once()
mock_dependencies['do_something_special'].assert_not_called()
def test_gadm_dist_by_natural_lands_child_flow(mock_dependencies):
gadm_dist_by_natural_lands_child_flow()
# Verify all tasks were called
mock_dependencies['load_zarr'].assert_called_once()
mock_dependencies['clean_data'].assert_called_once()
mock_dependencies['do_something_special'].assert_called_once()
3. Integration Testing
python
import pytest
from prefect.testing.utilities import prefect_test_harness
@pytest.mark.integration
def test_full_integration():
# Use actual data but in a test environment
with prefect_test_harness():
result = gadm_dist_by_natural_lands_child_flow()
assert result is not None # Add appropriate assertions
4. Testing Error Handling
python
def test_task_1_error_handling(mock_dependencies):
mock_dependencies['load_zarr'].side_effect = Exception("Test error")
with pytest.raises(Exception) as exc_info:
task_1()
assert "Test error" in str(exc_info.value)
Key Testing Tips:
1. Place these tests in a tests/
directory, following standard Python project structure
2. Use pytest.fixture
to set up and tear down test resources
3. Test both success and failure scenarios
4. Mock external dependencies to isolate the testing of your flow logic
5. Use meaningful assertions to verify the expected behavior
To run the tests:
bash
pytest tests/ -v
For integration tests specifically:
bash
pytest tests/ -v -m integration
Additional Recommendations:
1. Consider adding logging assertions to verify important events are logged
2. Test any parameters or configurations your tasks might accept
3. If your tasks have dependencies between them (e.g., task_2 needs task_1's output), make sure to test these relationships
4. Consider using pytest-cov
to measure test coverage
Remember to adapt these examples based on your actual implementation details and requirements. The key is to ensure that both the individual tasks and the flow orchestration are working as expected.Solomon Negusse
07/04/2025, 7:18 AMMarvin
07/04/2025, 7:19 AMMarvin
07/04/2025, 7:20 AMpython
import boto3
import pytest
from moto import mock_s3
from prefect.testing.utilities import prefect_test_harness
# Assuming your tasks look something like this:
@task
def task_1():
s3 = boto3.client('s3')
data = s3.get_object(Bucket='my-bucket', Key='input/data.zarr')
load_zarr(data)
@task
def task_2():
result = clean_data()
s3 = boto3.client('s3')
s3.put_object(Bucket='my-bucket', Key='output/cleaned.zarr', Body=result)
# Test setup
@pytest.fixture
def s3_test_bucket():
with mock_s3():
# Create mock S3 bucket and test data
s3 = boto3.client('s3')
s3.create_bucket(Bucket='my-bucket')
# Add test data to the bucket
s3.put_object(
Bucket='my-bucket',
Key='input/data.zarr',
Body=b'test data' # Replace with actual test data
)
yield 'my-bucket'
def test_flow_with_s3(s3_test_bucket):
with prefect_test_harness():
result = gadm_dist_child_flow()
# Verify output was written to S3
s3 = boto3.client('s3')
output = s3.get_object(Bucket='my-bucket', Key='output/cleaned.zarr')
assert output['Body'].read() is not None # Add appropriate assertions
2. Using LocalStack (Good for Integration Tests)
python
import os
import pytest
import boto3
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(scope="session")
def localstack_s3():
# Assuming LocalStack is running (via docker-compose or similar)
os.environ['AWS_ACCESS_KEY_ID'] = 'test'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'test'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
# Point boto3 to LocalStack
endpoint_url = '<http://localhost:4566>'
s3 = boto3.client('s3', endpoint_url=endpoint_url)
# Create test bucket
s3.create_bucket(Bucket='my-bucket')
return endpoint_url
def test_flow_with_localstack(localstack_s3):
# Configure your tasks to use the LocalStack endpoint
os.environ['S3_ENDPOINT_URL'] = localstack_s3
with prefect_test_harness():
result = gadm_dist_child_flow()
# Add assertions
3. Using a Test S3 Bucket (For Full Integration Tests)
```python
import pytest
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(scope="session")
def test_s3_bucket():
# Use a dedicated test bucket with a unique name
bucket_name = f'test-bucket-{uuid.uuid4()}'
s3 = boto3.client('s3')
s3.create_bucket(Bucket=bucket_name)
yield bucket_name
# Cleanup: Delete all objects and the bucket
objects = s3.list_objects_v2(Bucket=bucket_name)
for obj in objects.get('Contents', []):
s3.delete_object(Bucket=bucket_name, Key=obj['Key'])
s3.delete_bucket(Bucket=bucket_name)
def test_flow_with_real_s3(test_s3_bucket):
# Update your flow's bucket configuration
os.environ['S3_BUCKET'] = test_s3_bucket
with prefect_