<@ULVA73B9P> I want to test a flow accessing certa...
# ask-marvin
l
@Marvin I want to test a flow accessing certain parts of a deployment. How do I do this? Code:
Copy code
from prefect_meemoo.prefect.deployment import (
    check_deployment_failed_flows
)
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import run_deployment


def test_check_deployment_failed_flows():
    """
    Test the check_deployment_failed_flows function.
    """
    @flow(name="test_check_deployment_failed_flows")
    def failing_flow():
        """
        A simple flow that simulates a failure.
        """
        raise Exception("Simulated failure for testing purposes.")
    with prefect_test_harness():
        # Deploy the failing flow with Prefect
        failing_flow.deploy(name="failing-flow-deployment")
        
        # Run the deployment
        deployment_run = run_deployment("failing-flow-deployment")

        # assert that the deployment run fails
        assert check_deployment_failed_flows("failing-flow-deployment") is True
This returns this error: ValueError: Could not find work pool None. Please create it before deploying this flow.
m
thought for 11.1 seconds
Ah yes, when testing deployments in Prefect 3.x, you need to set up a work pool since deployments require a way to execute. Let me help you modify your test to work correctly. Here's the corrected version of your test code:
Copy code
python
from prefect_meemoo.prefect.deployment import check_deployment_failed_flows
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import run_deployment
from prefect.infrastructure.process import Process
from prefect.work_pools import WorkPool

def test_check_deployment_failed_flows():
    """
    Test the check_deployment_failed_flows function.
    """
    @flow(name="test_check_deployment_failed_flows")
    def failing_flow():
        """
        A simple flow that simulates a failure.
        """
        raise Exception("Simulated failure for testing purposes.")

    with prefect_test_harness():
        # Create a work pool for testing
        work_pool = WorkPool(
            name="test-pool",
            work_queue_name="test-queue",
            infrastructure_type="process"
        )
        work_pool.save()

        # Deploy the failing flow with Prefect
        failing_flow.deploy(
            name="failing-flow-deployment",
            work_pool_name="test-pool",
        )
        
        # Run the deployment
        deployment_run = run_deployment(
            "test-check-deployment-failed-flows/failing-flow-deployment",
            timeout=300
        )

        # assert that the deployment run fails
        assert check_deployment_failed_flows("failing-flow-deployment") is True
Key changes made: 1. Added creation of a work pool using
WorkPool
- this is required for deployments in Prefect 3.x 2. Specified the
work_pool_name
in the deployment 3. Updated the deployment run reference to include the flow name (the full path is
{flow_name}/{deployment_name}
) 4. Added a timeout to the
run_deployment
call to ensure we wait for the flow to complete A few important notes: - The work pool uses the "process" infrastructure, which is perfect for testing as it runs flows in local processes - The test harness provides an isolated environment for testing Prefect functionality - Make sure to use the full path when running the deployment (
flow_name/deployment_name
) If you're running this in a larger test suite, you might want to consider adding cleanup code to remove the work pool after the test:
Copy code
python
    with prefect_test_harness():
        try:
            # ... your test code ...
        finally:
            # Clean up the work pool
            work_pool.delete()
This will help keep your test environment clean between test runs.
l
How to do it in Prefect 2.20.16?
@Marvin How to do it in Prefect 2.20.16?
m
thought for 12.2 seconds
Ah, for Prefect 2.20.16, the testing approach is a bit different. Here's how you can modify your test code:
Copy code
python
from prefect_meemoo.prefect.deployment import check_deployment_failed_flows
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import Deployment
from prefect.infrastructure.process import Process

def test_check_deployment_failed_flows():
    """
    Test the check_deployment_failed_flows function.
    """
    @flow(name="test_check_deployment_failed_flows")
    def failing_flow():
        """
        A simple flow that simulates a failure.
        """
        raise Exception("Simulated failure for testing purposes.")

    with prefect_test_harness():
        # Create a deployment
        deployment = Deployment.build_from_flow(
            flow=failing_flow,
            name="failing-flow-deployment",
            infrastructure=Process(),
        )
        deployment.apply()
        
        # Run the deployment
        deployment_id = deployment.run()

        # assert that the deployment run fails
        assert check_deployment_failed_flows("failing-flow-deployment") is True
Key differences in Prefect 2.x: 1. Use
Deployment.build_from_flow()
instead of the
.deploy()
method 2. Use
deployment.apply()
to register the deployment 3. Use
deployment.run()
instead of
run_deployment()
4. No work pool is needed as Prefect 2.x uses agents and infrastructure directly If you need to specify additional parameters or schedule, you can add them to the deployment build:
Copy code
python
deployment = Deployment.build_from_flow(
    flow=failing_flow,
    name="failing-flow-deployment",
    infrastructure=Process(),
    tags=["test"],
    version="1",
    skip_upload=True  # Useful for testing to avoid storage requirements
)
Note that in Prefect 2.x, you'll need to ensure an agent is running to execute the deployment. However, in the test context with Process infrastructure, this is handled automatically. If you're planning to upgrade to Prefect 3.x in the future, I'd recommend using workers instead of agents in your 2.x setup, as this will make the migration path smoother. You can do this by modifying the deployment to use a worker:
Copy code
python
from prefect.infrastructure.docker import DockerContainer

deployment = Deployment.build_from_flow(
    flow=failing_flow,
    name="failing-flow-deployment",
    work_queue_name="test-queue",
    infrastructure=DockerContainer(),
)
This worker-based approach in 2.x will be more similar to how things work in 3.x, making any future upgrade easier.
l
@Marvin The flow run stays in run stat SCHEDULED
m
thought for 30.0 seconds
In Prefect 2.x, if your flow run stays in SCHEDULED state, it means there's no agent running to pick up and execute the flow. Let's modify the test to ensure the flow actually runs:
Copy code
python
from prefect_meemoo.prefect.deployment import check_deployment_failed_flows
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
from prefect.deployments import Deployment
from prefect.infrastructure.process import Process
from prefect.agent import PrefectAgent
import asyncio

def test_check_deployment_failed_flows():
    """
    Test the check_deployment_failed_flows function.
    """
    @flow(name="test_check_deployment_failed_flows")
    def failing_flow():
        """
        A simple flow that simulates a failure.
        """
        raise Exception("Simulated failure for testing purposes.")

    with prefect_test_harness():
        # Create a deployment
        deployment = Deployment.build_from_flow(
            flow=failing_flow,
            name="failing-flow-deployment",
            infrastructure=Process(),
            skip_upload=True
        )
        deployment.apply()
        
        # Create and start an agent
        agent = PrefectAgent(work_queues=["default"])
        
        # Start the agent in the background
        agent_task = asyncio.create_task(agent.start())
        
        # Run the deployment
        deployment_id = deployment.run()
        
        # Wait for the flow run to complete (adjust timeout as needed)
        from prefect.client import get_client
        client = get_client()
        flow_run = asyncio.run(client.read_flow_run(deployment_id))
        
        # Give the agent time to pick up and execute the flow
        timeout = 30  # seconds
        start_time = time.time()
        while flow_run.state.type == "SCHEDULED":
            if time.time() - start_time > timeout:
                raise TimeoutError("Flow run didn't start within timeout period")
            time.sleep(1)
            flow_run = asyncio.run(client.read_flow_run(deployment_id))

        # Stop the agent
        agent_task.cancel()
        
        # assert that the deployment run fails
        assert check_deployment_failed_flows("failing-flow-deployment") is True
Here are the key additions: 1. Added an agent to pick up and execute the flow run 2. Added polling logic to wait for the flow run to complete 3. Added proper cleanup of the agent You'll also need these additional imports at the top:
Copy code
python
import time
import asyncio
from prefect.client import get_client
If you're still seeing issues, you might want to add more detailed state checking:
Copy code
python
# More detailed state checking
while flow_run.state.type == "SCHEDULED":
    print(f"Current state: {flow_run.state.type}")
    if time.time() - start_time > timeout:
        raise TimeoutError(f"Flow run didn't start within {timeout} seconds. Current state: {flow_run.state.type}")
    time.sleep(1)
    flow_run = asyncio.run(client.read_flow_run(deployment_id))

print(f"Final state: {flow_run.state.type}")
Alternative Approach: If you're finding the agent-based testing complex, you could also consider testing the flow directly without a deployment:
Copy code
python
def test_check_deployment_failed_flows():
    @flow(name="test_check_deployment_failed_flows")
    def failing_flow():
        raise Exception("Simulated failure for testing purposes.")

    with prefect_test_harness():
        # Run the flow directly
        flow_state = failing_flow()
        assert flow_state.is_failed()
This simpler approach might be sufficient depending on what exactly you're trying to test. However, if you specifically need to test the deployment functionality, stick with the first approach with the agent. Remember that in Prefect 2.x, agents are crucial for executing deployments. The agent watches for new flow runs and executes them using the specified infrastructure. Without a running agent, flow runs will remain in the SCHEDULED state.