Lennert Van de Velde
07/02/2025, 11:36 AMfrom prefect_meemoo.prefect.deployment import (
check_deployment_failed_flows
)
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import run_deployment
def test_check_deployment_failed_flows():
"""
Test the check_deployment_failed_flows function.
"""
@flow(name="test_check_deployment_failed_flows")
def failing_flow():
"""
A simple flow that simulates a failure.
"""
raise Exception("Simulated failure for testing purposes.")
with prefect_test_harness():
# Deploy the failing flow with Prefect
failing_flow.deploy(name="failing-flow-deployment")
# Run the deployment
deployment_run = run_deployment("failing-flow-deployment")
# assert that the deployment run fails
assert check_deployment_failed_flows("failing-flow-deployment") is True
This returns this error: ValueError: Could not find work pool None. Please create it before deploying this flow.Marvin
07/02/2025, 11:36 AMMarvin
07/02/2025, 11:36 AMpython
from prefect_meemoo.prefect.deployment import check_deployment_failed_flows
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import run_deployment
from prefect.infrastructure.process import Process
from prefect.work_pools import WorkPool
def test_check_deployment_failed_flows():
"""
Test the check_deployment_failed_flows function.
"""
@flow(name="test_check_deployment_failed_flows")
def failing_flow():
"""
A simple flow that simulates a failure.
"""
raise Exception("Simulated failure for testing purposes.")
with prefect_test_harness():
# Create a work pool for testing
work_pool = WorkPool(
name="test-pool",
work_queue_name="test-queue",
infrastructure_type="process"
)
work_pool.save()
# Deploy the failing flow with Prefect
failing_flow.deploy(
name="failing-flow-deployment",
work_pool_name="test-pool",
)
# Run the deployment
deployment_run = run_deployment(
"test-check-deployment-failed-flows/failing-flow-deployment",
timeout=300
)
# assert that the deployment run fails
assert check_deployment_failed_flows("failing-flow-deployment") is True
Key changes made:
1. Added creation of a work pool using WorkPool
- this is required for deployments in Prefect 3.x
2. Specified the work_pool_name
in the deployment
3. Updated the deployment run reference to include the flow name (the full path is {flow_name}/{deployment_name}
)
4. Added a timeout to the run_deployment
call to ensure we wait for the flow to complete
A few important notes:
- The work pool uses the "process" infrastructure, which is perfect for testing as it runs flows in local processes
- The test harness provides an isolated environment for testing Prefect functionality
- Make sure to use the full path when running the deployment (flow_name/deployment_name
)
If you're running this in a larger test suite, you might want to consider adding cleanup code to remove the work pool after the test:
python
with prefect_test_harness():
try:
# ... your test code ...
finally:
# Clean up the work pool
work_pool.delete()
This will help keep your test environment clean between test runs.Lennert Van de Velde
07/02/2025, 11:38 AMLennert Van de Velde
07/02/2025, 11:38 AMMarvin
07/02/2025, 11:38 AMMarvin
07/02/2025, 11:38 AMpython
from prefect_meemoo.prefect.deployment import check_deployment_failed_flows
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import Deployment
from prefect.infrastructure.process import Process
def test_check_deployment_failed_flows():
"""
Test the check_deployment_failed_flows function.
"""
@flow(name="test_check_deployment_failed_flows")
def failing_flow():
"""
A simple flow that simulates a failure.
"""
raise Exception("Simulated failure for testing purposes.")
with prefect_test_harness():
# Create a deployment
deployment = Deployment.build_from_flow(
flow=failing_flow,
name="failing-flow-deployment",
infrastructure=Process(),
)
deployment.apply()
# Run the deployment
deployment_id = deployment.run()
# assert that the deployment run fails
assert check_deployment_failed_flows("failing-flow-deployment") is True
Key differences in Prefect 2.x:
1. Use Deployment.build_from_flow()
instead of the .deploy()
method
2. Use deployment.apply()
to register the deployment
3. Use deployment.run()
instead of run_deployment()
4. No work pool is needed as Prefect 2.x uses agents and infrastructure directly
If you need to specify additional parameters or schedule, you can add them to the deployment build:
python
deployment = Deployment.build_from_flow(
flow=failing_flow,
name="failing-flow-deployment",
infrastructure=Process(),
tags=["test"],
version="1",
skip_upload=True # Useful for testing to avoid storage requirements
)
Note that in Prefect 2.x, you'll need to ensure an agent is running to execute the deployment. However, in the test context with Process infrastructure, this is handled automatically.
If you're planning to upgrade to Prefect 3.x in the future, I'd recommend using workers instead of agents in your 2.x setup, as this will make the migration path smoother. You can do this by modifying the deployment to use a worker:
python
from prefect.infrastructure.docker import DockerContainer
deployment = Deployment.build_from_flow(
flow=failing_flow,
name="failing-flow-deployment",
work_queue_name="test-queue",
infrastructure=DockerContainer(),
)
This worker-based approach in 2.x will be more similar to how things work in 3.x, making any future upgrade easier.Lennert Van de Velde
07/02/2025, 12:29 PMMarvin
07/02/2025, 12:29 PMMarvin
07/02/2025, 12:29 PMpython
from prefect_meemoo.prefect.deployment import check_deployment_failed_flows
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import Deployment
from prefect.infrastructure.process import Process
from prefect.agent import PrefectAgent
import asyncio
def test_check_deployment_failed_flows():
"""
Test the check_deployment_failed_flows function.
"""
@flow(name="test_check_deployment_failed_flows")
def failing_flow():
"""
A simple flow that simulates a failure.
"""
raise Exception("Simulated failure for testing purposes.")
with prefect_test_harness():
# Create a deployment
deployment = Deployment.build_from_flow(
flow=failing_flow,
name="failing-flow-deployment",
infrastructure=Process(),
skip_upload=True
)
deployment.apply()
# Create and start an agent
agent = PrefectAgent(work_queues=["default"])
# Start the agent in the background
agent_task = asyncio.create_task(agent.start())
# Run the deployment
deployment_id = deployment.run()
# Wait for the flow run to complete (adjust timeout as needed)
from prefect.client import get_client
client = get_client()
flow_run = asyncio.run(client.read_flow_run(deployment_id))
# Give the agent time to pick up and execute the flow
timeout = 30 # seconds
start_time = time.time()
while flow_run.state.type == "SCHEDULED":
if time.time() - start_time > timeout:
raise TimeoutError("Flow run didn't start within timeout period")
time.sleep(1)
flow_run = asyncio.run(client.read_flow_run(deployment_id))
# Stop the agent
agent_task.cancel()
# assert that the deployment run fails
assert check_deployment_failed_flows("failing-flow-deployment") is True
Here are the key additions:
1. Added an agent to pick up and execute the flow run
2. Added polling logic to wait for the flow run to complete
3. Added proper cleanup of the agent
You'll also need these additional imports at the top:
python
import time
import asyncio
from prefect.client import get_client
If you're still seeing issues, you might want to add more detailed state checking:
python
# More detailed state checking
while flow_run.state.type == "SCHEDULED":
print(f"Current state: {flow_run.state.type}")
if time.time() - start_time > timeout:
raise TimeoutError(f"Flow run didn't start within {timeout} seconds. Current state: {flow_run.state.type}")
time.sleep(1)
flow_run = asyncio.run(client.read_flow_run(deployment_id))
print(f"Final state: {flow_run.state.type}")
Alternative Approach:
If you're finding the agent-based testing complex, you could also consider testing the flow directly without a deployment:
python
def test_check_deployment_failed_flows():
@flow(name="test_check_deployment_failed_flows")
def failing_flow():
raise Exception("Simulated failure for testing purposes.")
with prefect_test_harness():
# Run the flow directly
flow_state = failing_flow()
assert flow_state.is_failed()
This simpler approach might be sufficient depending on what exactly you're trying to test. However, if you specifically need to test the deployment functionality, stick with the first approach with the agent.
Remember that in Prefect 2.x, agents are crucial for executing deployments. The agent watches for new flow runs and executes them using the specified infrastructure. Without a running agent, flow runs will remain in the SCHEDULED state.