Jack P
10/31/2023, 7:07 PMMarvin
10/31/2023, 7:08 PMPREFECT_API_URL
to <http://host.docker.internal:4200/api>
. If you're running Linux, you'll need to set your PREFECT_API_URL
to <http://localhost:4200/api>
and use the --network="host"
option instead. You can run your docker container with the following command: docker run --network="host" -e PREFECT_API_URL=<http://host.docker.internal:4200/api> prefect-docker-guide-image
.
2. When building a Docker image, you first need to have a Dockerfile. Once you have your Dockerfile, you can build your image by running: docker build -t prefect-docker-guide-image .
. You can check that your build worked by running a container from your new image. Your container will need an API URL and an API key to communicate with Prefect Cloud. You can get an API key from the API Keys section of the user settings in the Prefect UI.
For more detailed information, you can check out the Prefect Docker Guide.Nate
10/31/2023, 7:40 PMMarvin
10/31/2023, 7:41 PMprefect_test_harness
is a context manager provided by Prefect for unit tests. It allows you to run flows and tasks against a temporary local SQLite database. Here is an example of how to use it:
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
@flow
def my_favorite_flow():
return 42
def test_my_favorite_flow():
with prefect_test_harness(): # run the flow against a temporary testing database
assert my_favorite_flow() == 42
For more extensive testing, you can leverage prefect_test_harness
as a fixture in your unit testing framework. For example, when using `pytest`:
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@flow
def my_favorite_flow():
return 42
def test_my_favorite_flow():
assert my_favorite_flow() == 42
In this example, the fixture is scoped to run once for the entire test session. In most cases, you will not need a clean database for each test and just want to isolate your test runs to a test database. Creating a new test database per test creates significant overhead.
You can find more information in the Prefect Testing Guide.Jack P
10/31/2023, 7:44 PMJack P
10/31/2023, 8:56 PMMarvin
10/31/2023, 8:56 PMConcurrentTaskRunner
and SequentialTaskRunner
, there are a few differences to keep in mind:
- ConcurrentTaskRunner
can run tasks concurrently, allowing tasks to switch when blocking on IO. Tasks will be submitted to a thread pool maintained by anyio. This means that when testing flows that use ConcurrentTaskRunner
, you should ensure that your tests are designed to handle concurrent execution and that any shared resources are thread-safe.
- SequentialTaskRunner
forces tasks to run sequentially rather than concurrently. It works with both synchronous and asynchronous task functions. When testing flows that use SequentialTaskRunner
, you can generally write simpler tests because you don't have to worry about concurrency issues. However, you should still ensure that your tests correctly handle any asynchronous tasks.
In general, the type of task runner you use can affect the behavior of your flow, so it's important to choose the right one for your needs and to test your flows accordingly. You can find more information in the Prefect Task Runners Guide.Jack P
10/31/2023, 8:57 PMMarvin
10/31/2023, 8:58 PMConcurrentTaskRunner
. This example assumes that you're using the pytest
testing framework and the prefect_test_harness
context manager from Prefect.
import pytest
from prefect import Flow, Task, task
from prefect.engine.runners import ConcurrentTaskRunner
from prefect.testing.utilities import prefect_test_harness
class MyTask(Task):
def run(self):
return 42
@task(task_runner=ConcurrentTaskRunner)
def my_concurrent_task():
return MyTask().run()
def create_flow():
with Flow("My Flow") as flow:
result = my_concurrent_task()
return flow
def test_my_flow():
flow = create_flow()
with prefect_test_harness():
state = flow.run()
assert state.is_successful()
assert state.result[result]._result == 42
In this example, my_concurrent_task
is a task that uses the ConcurrentTaskRunner
. The create_flow
function creates a flow that includes this task. The test_my_flow
function tests this flow using the prefect_test_harness
context manager. It checks that the flow runs successfully and that the result of the task is as expected.
Please note that this is a very basic example. In a real-world scenario, your tasks would likely involve more complex operations, and your tests might need to handle things like task dependencies, error handling, and so on.Jack P
10/31/2023, 8:59 PMMarvin
10/31/2023, 9:00 PMConcurrentTaskRunner
and has a task with retries:
from prefect import Flow, Task, task
from prefect.engine.runners import ConcurrentTaskRunner
from prefect.engine.state import Failed
import random
class MyTask(Task):
def run(self):
return 42
@task(task_runner=ConcurrentTaskRunner, retries=3, retry_delay_seconds=[1, 10, 100])
def my_concurrent_task():
if random.choice([True, False]):
raise RuntimeError("not this time!")
return MyTask().run()
def create_flow():
with Flow("My Flow") as flow:
result = my_concurrent_task()
return flow
def test_my_flow():
flow = create_flow()
state = flow.run()
if isinstance(state, Failed):
print(f"Flow failed with message: {state.message}")
else:
assert state.is_successful()
assert state.result[result]._result == 42
In this example, my_concurrent_task
is a task that uses the ConcurrentTaskRunner
and has 3 retries with delay of 1, 10, and 100 seconds respectively. The create_flow
function creates a flow that includes this task. The test_my_flow
function tests this flow, it checks that the flow runs successfully and that the result of the task is as expected. If the flow fails, it prints the failure message.
Please note that this is a very basic example. In a real-world scenario, your tasks would likely involve more complex operations, and your tests might need to handle things like task dependencies, error handling, and so on.Nate
10/31/2023, 9:00 PMJack P
10/31/2023, 9:04 PMload_data(
name_and_details_dict=name_and_details_dict,
name_bq_mapping_dict=name_bq_mapping_dict,
)
load_data definition:
@flow(
name="bigquery-load-data",
description="Loads data from GCS to BigQuery.",
task_runner=ConcurrentTaskRunner(),
)
def load_data(name_and_details_dict: dict, name_bq_mapping_dict: dict):
return None
Basically I am just trying to get it to call atm, and have it literally just doing this ^. And it's failing with:
/usr/local/lib/python3.10/site-packages/prefect/flows.py:1072: in __call__
return enter_flow_run_engine_from_flow_call(
/usr/local/lib/python3.10/site-packages/prefect/engine.py:282: in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:243: in wait_for_call_in_loop_thread
return call.result()
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:282: in result
return self.future.result(timeout=timeout)
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:168: in result
return self.__get_result()
/usr/local/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
raise self._exception
/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:345: in _run_async
result = await coro
/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py:49: in with_injected_client
async with client_context as new_client:
/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py:2718: in __aenter__
self._ephemeral_lifespan = await self._exit_stack.enter_async_context(
/usr/local/lib/python3.10/contextlib.py:619: in enter_async_context
result = await _cm_type.__aenter__(cm)
/usr/local/lib/python3.10/contextlib.py:199: in __aenter__
return await anext(self.gen)
/usr/local/lib/python3.10/site-packages/prefect/client/base.py:124: in app_lifespan_context
await context.__aenter__()
/usr/local/lib/python3.10/site-packages/asgi_lifespan/_manager.py:99: in __aenter__
await self.startup()
/usr/local/lib/python3.10/site-packages/asgi_lifespan/_manager.py:40: in startup
await self._concurrency_backend.run_and_fail_after(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asgi_lifespan._concurrency.asyncio.AsyncioBackend object at 0x40d832dab0>
seconds = 30
coroutine = <bound method AsyncioEvent.wait of <asgi_lifespan._concurrency.asyncio.AsyncioEvent object at 0x40d832da20>>
async def run_and_fail_after(
self,
seconds: typing.Optional[float],
coroutine: typing.Callable[[], typing.Awaitable[None]],
) -> None:
try:
await asyncio.wait_for(coroutine(), timeout=seconds)
except asyncio.TimeoutError:
> raise TimeoutError
E TimeoutError
/usr/local/lib/python3.10/site-packages/asgi_lifespan/_concurrency/asyncio.py:46: TimeoutError
=============================== warnings summary ===============================
tests/helm/core/integrations/bigquery/test_load_data_to_bq.py::TestLoadDataToBq::test_load_data[name_and_details_dict0-name_bq_mapping_dict0]
/usr/local/lib/python3.10/contextlib.py:142: SAWarning: Skipped unsupported reflection of expre
All of my other tests succeed on flows, but they are SequentialTaskRunner(), so I didn't know if that was what was causing issue 😕Jack P
10/31/2023, 9:08 PMJack P
10/31/2023, 9:13 PM