<@ULVA73B9P>, Tell me everything you know about us...
# ask-marvin
j
@Marvin, Tell me everything you know about using Docker (Dockerfile/Docker Compose) with Prefect Testing Harness 🙏
m
Here are some key points about using Docker with Prefect: 1. To ensure that your flow container can communicate with the Prefect API, you'll need to set your
PREFECT_API_URL
to
<http://host.docker.internal:4200/api>
. If you're running Linux, you'll need to set your
PREFECT_API_URL
to
<http://localhost:4200/api>
and use the
--network="host"
option instead. You can run your docker container with the following command:
docker run --network="host" -e PREFECT_API_URL=<http://host.docker.internal:4200/api> prefect-docker-guide-image
. 2. When building a Docker image, you first need to have a Dockerfile. Once you have your Dockerfile, you can build your image by running:
docker build -t prefect-docker-guide-image .
. You can check that your build worked by running a container from your new image. Your container will need an API URL and an API key to communicate with Prefect Cloud. You can get an API key from the API Keys section of the user settings in the Prefect UI. For more detailed information, you can check out the Prefect Docker Guide.
🙏 1
n
@Marvin tell me about prefect_test_harness
m
prefect_test_harness
is a context manager provided by Prefect for unit tests. It allows you to run flows and tasks against a temporary local SQLite database. Here is an example of how to use it:
Copy code
from prefect import flow
from prefect.testing.utilities import prefect_test_harness

@flow
def my_favorite_flow():
    return 42

def test_my_favorite_flow():
    with prefect_test_harness():  # run the flow against a temporary testing database
        assert my_favorite_flow() == 42
For more extensive testing, you can leverage
prefect_test_harness
as a fixture in your unit testing framework. For example, when using `pytest`:
Copy code
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness

@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield

@flow
def my_favorite_flow():
    return 42

def test_my_favorite_flow():
    assert my_favorite_flow() == 42
In this example, the fixture is scoped to run once for the entire test session. In most cases, you will not need a clean database for each test and just want to isolate your test runs to a test database. Creating a new test database per test creates significant overhead. You can find more information in the Prefect Testing Guide.
j
Thanks Nate, I just figured it out finally lol, my mocking was super whacked out 😭
@Marvin, do you have to do anything different in testing ConcurrentTaskRunner flows compared to SequentialTaskRunner flows?
m
When testing flows with
ConcurrentTaskRunner
and
SequentialTaskRunner
, there are a few differences to keep in mind: -
ConcurrentTaskRunner
can run tasks concurrently, allowing tasks to switch when blocking on IO. Tasks will be submitted to a thread pool maintained by anyio. This means that when testing flows that use
ConcurrentTaskRunner
, you should ensure that your tests are designed to handle concurrent execution and that any shared resources are thread-safe. -
SequentialTaskRunner
forces tasks to run sequentially rather than concurrently. It works with both synchronous and asynchronous task functions. When testing flows that use
SequentialTaskRunner
, you can generally write simpler tests because you don't have to worry about concurrency issues. However, you should still ensure that your tests correctly handle any asynchronous tasks. In general, the type of task runner you use can affect the behavior of your flow, so it's important to choose the right one for your needs and to test your flows accordingly. You can find more information in the Prefect Task Runners Guide.
j
@Marvin can you give example of basic ConcurrentTaskRunner flow unit test please 🙏 ?
m
Sure, here's a basic example of how you might test a flow that uses the
ConcurrentTaskRunner
. This example assumes that you're using the
pytest
testing framework and the
prefect_test_harness
context manager from Prefect.
Copy code
import pytest
from prefect import Flow, Task, task
from prefect.engine.runners import ConcurrentTaskRunner
from prefect.testing.utilities import prefect_test_harness

class MyTask(Task):
    def run(self):
        return 42

@task(task_runner=ConcurrentTaskRunner)
def my_concurrent_task():
    return MyTask().run()

def create_flow():
    with Flow("My Flow") as flow:
        result = my_concurrent_task()
    return flow

def test_my_flow():
    flow = create_flow()
    with prefect_test_harness():
        state = flow.run()
        assert state.is_successful()
        assert state.result[result]._result == 42
In this example,
my_concurrent_task
is a task that uses the
ConcurrentTaskRunner
. The
create_flow
function creates a flow that includes this task. The
test_my_flow
function tests this flow using the
prefect_test_harness
context manager. It checks that the flow runs successfully and that the result of the task is as expected. Please note that this is a very basic example. In a real-world scenario, your tasks would likely involve more complex operations, and your tests might need to handle things like task dependencies, error handling, and so on.
👀 1
j
@Marvin, no the flow should have task runner, retry
m
Sure, here's an example of a Prefect flow that uses the
ConcurrentTaskRunner
and has a task with retries:
Copy code
from prefect import Flow, Task, task
from prefect.engine.runners import ConcurrentTaskRunner
from prefect.engine.state import Failed
import random

class MyTask(Task):
    def run(self):
        return 42

@task(task_runner=ConcurrentTaskRunner, retries=3, retry_delay_seconds=[1, 10, 100])
def my_concurrent_task():
    if random.choice([True, False]):
        raise RuntimeError("not this time!")
    return MyTask().run()

def create_flow():
    with Flow("My Flow") as flow:
        result = my_concurrent_task()
    return flow

def test_my_flow():
    flow = create_flow()
    state = flow.run()
    if isinstance(state, Failed):
        print(f"Flow failed with message: {state.message}")
    else:
        assert state.is_successful()
        assert state.result[result]._result == 42
In this example,
my_concurrent_task
is a task that uses the
ConcurrentTaskRunner
and has 3 retries with delay of 1, 10, and 100 seconds respectively. The
create_flow
function creates a flow that includes this task. The
test_my_flow
function tests this flow, it checks that the flow runs successfully and that the result of the task is as expected. If the flow fails, it prints the failure message. Please note that this is a very basic example. In a real-world scenario, your tasks would likely involve more complex operations, and your tests might need to handle things like task dependencies, error handling, and so on.
🚀 1
n
what are you trying to test with concurrent task runner?
j
Calling in test function:
Copy code
load_data(
            name_and_details_dict=name_and_details_dict,
            name_bq_mapping_dict=name_bq_mapping_dict,
        )
load_data definition:
Copy code
@flow(
    name="bigquery-load-data",
    description="Loads data from GCS to BigQuery.",
    task_runner=ConcurrentTaskRunner(),
)
def load_data(name_and_details_dict: dict, name_bq_mapping_dict: dict):
    return None
Basically I am just trying to get it to call atm, and have it literally just doing this ^. And it's failing with:
Copy code
/usr/local/lib/python3.10/site-packages/prefect/flows.py:1072: in __call__
    return enter_flow_run_engine_from_flow_call(
/usr/local/lib/python3.10/site-packages/prefect/engine.py:282: in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:243: in wait_for_call_in_loop_thread
    return call.result()
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:282: in result
    return self.future.result(timeout=timeout)
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:168: in result
    return self.__get_result()
/usr/local/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
    raise self._exception
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:345: in _run_async
    result = await coro
/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py:49: in with_injected_client
    async with client_context as new_client:
/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py:2718: in __aenter__
    self._ephemeral_lifespan = await self._exit_stack.enter_async_context(
/usr/local/lib/python3.10/contextlib.py:619: in enter_async_context
    result = await _cm_type.__aenter__(cm)
/usr/local/lib/python3.10/contextlib.py:199: in __aenter__
    return await anext(self.gen)
/usr/local/lib/python3.10/site-packages/prefect/client/base.py:124: in app_lifespan_context
    await context.__aenter__()
/usr/local/lib/python3.10/site-packages/asgi_lifespan/_manager.py:99: in __aenter__
    await self.startup()
/usr/local/lib/python3.10/site-packages/asgi_lifespan/_manager.py:40: in startup
    await self._concurrency_backend.run_and_fail_after(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgi_lifespan._concurrency.asyncio.AsyncioBackend object at 0x40d832dab0>
seconds = 30
coroutine = <bound method AsyncioEvent.wait of <asgi_lifespan._concurrency.asyncio.AsyncioEvent object at 0x40d832da20>>

    async def run_and_fail_after(
        self,
        seconds: typing.Optional[float],
        coroutine: typing.Callable[[], typing.Awaitable[None]],
    ) -> None:
        try:
            await asyncio.wait_for(coroutine(), timeout=seconds)
        except asyncio.TimeoutError:
>           raise TimeoutError
E           TimeoutError

/usr/local/lib/python3.10/site-packages/asgi_lifespan/_concurrency/asyncio.py:46: TimeoutError
=============================== warnings summary ===============================
tests/helm/core/integrations/bigquery/test_load_data_to_bq.py::TestLoadDataToBq::test_load_data[name_and_details_dict0-name_bq_mapping_dict0]
  /usr/local/lib/python3.10/contextlib.py:142: SAWarning: Skipped unsupported reflection of expre
All of my other tests succeed on flows, but they are SequentialTaskRunner(), so I didn't know if that was what was causing issue 😕
actually disregard, I changed to SequentialTaskRunner and it's still failing, so weird 🚀 . I'll post if I find solution
ahh it's because harness is taking longer than 30 seconds to start in Pytest I think.. I wonder if there's way to either increase asyncio.timeout ---- My solution is to just have expected fail flow run at the beginning of tests, which 'wakes up' the Prefect Test harness and it works for all then 😄