Jarvis Stubblefield
10/17/2022, 8:52 PM10 failed, 4486 passed, 39 skipped, 3 xfailed, 11 errors in 1173.85s (0:19:33)
I wasn’t expecting to have failures… I’ve added my code and am now running the 20 minutes of tests to ensure it’s working as expected… I’m assuming so long as I don’t get to 11 failed
in the summary I should be okay right?Zanie
10/17/2022, 9:26 PMpytest -n auto
which will run the test suite in parallel using workers for a significant speedup.Jarvis Stubblefield
10/17/2022, 9:29 PMprocess.py
file. The change I have made is the definite cause of the errors I’m now seeing for almost all of the Process
tests.
import asyncio
import os
import sys
import tempfile
from typing import Optional, Union
import anyio.abc
import sniffio
from pydantic import Field
from typing_extensions import Literal
from prefect.infrastructure.base import Infrastructure, InfrastructureResult
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.filesystem import tmpchdir
from prefect.utilities.processutils import run_process
def _use_threaded_child_watcher():
if (
sys.version_info < (3, 8)
and sniffio.current_async_library() == "asyncio"
and sys.platform != "win32"
):
from prefect.utilities.compat import ThreadedChildWatcher
# Python < 3.8 does not use a `ThreadedChildWatcher` by default which can
# lead to errors in tests on unix as the previous default `SafeChildWatcher`
# is not compatible with threaded event loops.
asyncio.get_event_loop_policy().set_child_watcher(ThreadedChildWatcher())
class Process(Infrastructure):
"""
Run a command in a new process.
Current environment variables and Prefect settings will be included in the created
process. Configured environment variables will override any current environment
variables.
Attributes:
command: A list of strings specifying the command to run in the container to
start the flow run. In most cases you should not override this.
env: Environment variables to set for the new process.
labels: Labels for the process. Labels are for metadata purposes only and
cannot be attached to the process itself.
name: A name for the process. For display purposes only.
"""
_logo_url = "<https://images.ctfassets.net/gm98wzqotmnx/39WQhVu4JK40rZWltGqhuC/d15be6189a0cb95949a6b43df00dcb9b/image5.png?h=250>"
type: Literal["process"] = Field(
default="process", description="The type of infrastructure."
)
stream_output: bool = Field(
default=True,
description="If set, output will be streamed from the process to local standard output.",
)
cwd: Union[str, bytes, None] = Field(
default=None,
description="If set, the process will open within the specified path as the working directory."
) # Needs the "PathLike[str]" definition to fully match anyio.open_process
@sync_compatible
async def run(
self,
task_status: anyio.abc.TaskStatus = None,
) -> Optional[bool]:
if not self.command:
raise ValueError("Process cannot be run with empty command.")
_use_threaded_child_watcher()
display_name = f" {self.name!r}" if self.name else ""
# Open a subprocess to execute the flow run
<http://self.logger.info|self.logger.info>(f"Opening process{display_name}...")
if self.cwd is None:
with tempfile.TemporaryDirectory(suffix="prefect") as tmp_dir:
process = self._create_process(
display_name=display_name,
task_status=task_status,
working_dir=tmp_dir,
)
else:
# We have specified a working directory for the process.
# NOTE: This will not clean up the working directory like the TemporaryDirectory will.
with tmpchdir(self.cwd) as tmp_dir:
process = self._create_process(
display_name=display_name,
task_status=task_status,
working_dir=tmp_dir,
)
# Use the pid for display if no name was given
display_name = display_name or f" {process.pid}"
if process.returncode:
help_message = None
if process.returncode == -9:
help_message = (
"This indicates that the process exited due to a SIGKILL signal. "
"Typically, this is caused by high memory usage causing the "
"operating system to terminate the process."
)
elif process.returncode == 247:
help_message = (
"This indicates that the process was terminated due to high "
"memory usage."
)
self.logger.error(
f"Process{display_name} exited with status code: "
f"{process.returncode}" + (f"; {help_message}" if help_message else "")
)
else:
<http://self.logger.info|self.logger.info>(f"Process{display_name} exited cleanly.")
return ProcessResult(
status_code=process.returncode, identifier=str(process.pid)
)
def preview(self):
environment = self._get_environment_variables(include_os_environ=False)
return " \\\n".join(
[f"{key}={value}" for key, value in environment.items()]
+ [" ".join(self.command)]
)
async def _create_process(self, display_name, task_status, working_dir):
self.logger.debug(
f"Process{display_name} running command: {' '.join(self.command)} in {working_dir}"
)
return await run_process(
self.command,
stream_output=self.stream_output,
task_status=task_status,
env=self._get_environment_variables(),
cwd=working_dir,
)
def _get_environment_variables(self, include_os_environ: bool = True):
os_environ = os.environ if include_os_environ else {}
# The base environment must override the current environment or
# the Prefect settings context may not be respected
env = {**os_environ, **self._base_environment(), **self.env}
# Drop null values allowing users to "unset" variables
return {key: value for key, value in env.items() if value is not None}
def _base_flow_run_command(self):
return [sys.executable, "-m", "prefect.engine"]
class ProcessResult(InfrastructureResult):
"""Contains information about the final state of a completed process"""
# Use the pid for display if no name was given
> display_name = display_name or f" {process.pid}"
E AttributeError: 'coroutine' object has no attribute 'pid'
src/prefect/infrastructure/process.py:94: AttributeError
async def _create_process() method
. I’m guessing that extra layer has somehow messed with it? @Zanie, thoughts?await
in place. That seems to have fixed that… now working to see what else I might need. 😉Zanie
10/17/2022, 9:42 PMJarvis Stubblefield
10/17/2022, 9:42 PMos.PathLike[str]
as one of the types for CWD but the underlying methods allow for it..pydantic
doesn’t support the above.tmp_path
being passed into the test is a PosixPath
… which fails validation.cwd
without using the Field
from pydantic
… I used it for documentation of the description…"PathLike[str]"
into the `Union[]`…
┌─[ballisticpain@BallisticDevelopment] - [~/Development/prefect] - [Mon Oct 17, 16:41]
└─[$] <git:(main*)> pytest tests/infrastructure/test_process.py
ImportError while loading conftest '/Users/ballisticpain/Development/prefect/tests/conftest.py'.
tests/conftest.py:36: in <module>
import prefect
src/prefect/__init__.py:42: in <module>
import prefect.infrastructure.process
src/prefect/infrastructure/__init__.py:11: in <module>
from prefect.infrastructure.process import Process, ProcessResult
src/prefect/infrastructure/process.py:33: in <module>
class Process(Infrastructure):
pydantic/main.py:198: in pydantic.main.ModelMetaclass.__new__
???
pydantic/fields.py:506: in pydantic.fields.ModelField.infer
???
pydantic/fields.py:436: in pydantic.fields.ModelField.__init__
???
pydantic/fields.py:552: in pydantic.fields.ModelField.prepare
???
pydantic/fields.py:663: in pydantic.fields.ModelField._type_analysis
???
pydantic/fields.py:808: in pydantic.fields.ModelField._create_sub_type
???
pydantic/fields.py:436: in pydantic.fields.ModelField.__init__
???
pydantic/fields.py:552: in pydantic.fields.ModelField.prepare
???
pydantic/fields.py:755: in pydantic.fields.ModelField._type_analysis
???
E TypeError: Fields of type "<class 'os.PathLike'>" are not supported.
tmp_path
wrapped in a str()
… now I’m trying to figure out why my test isn’t working… here’s the test
def test_process_runs_command_in_working_dir(mock_open_process, tmp_path):
assert Process(command=["echo", "hello"], stream_output=False, cwd=str(tmp_path)).run()
mock_open_process.assert_awaited_once()
cwd = mock_open_process.call_args[1].get("cwd")
assert cwd == str(tmp_path)
┌─[ballisticpain@BallisticDevelopment] - [~/Development/prefect] - [Mon Oct 17, 16:46]
└─[$] <git:(main*)> pytest tests/infrastructure/test_process.py
============================================================================================================================= test session starts ==============================================================================================================================
platform darwin -- Python 3.10.2, pytest-7.1.3, pluggy-1.0.0
rootdir: /Users/ballisticpain/Development/prefect, configfile: setup.cfg
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, env-0.6.2, flaky-3.7.0, asyncio-0.19.0, timeout-2.1.0, cov-4.0.0, respx-0.20.0
asyncio: mode=auto
timeout: 60.0s
timeout method: signal
timeout func_only: False
collected 19 items
tests/infrastructure/test_process.py .......F........... [100%]
=================================================================================================================================== FAILURES ===================================================================================================================================
___________________________________________________________________________________________________________________ test_process_runs_command_in_working_dir ___________________________________________________________________________________________________________________
mock_open_process = <AsyncMock id='4526772176'>, tmp_path = PosixPath('/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')
def test_process_runs_command_in_working_dir(mock_open_process, tmp_path):
> assert Process(command=["echo", "hello"], stream_output=False, cwd=str(tmp_path)).run()
E assert ProcessResult(identifier="<AsyncMock name='mock().pid' id='4526856064'>", status_code=1)
E + where ProcessResult(identifier="<AsyncMock name='mock().pid' id='4526856064'>", status_code=1) = <bound method Process.run of Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_o...ate/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')>()
E + where <bound method Process.run of Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_o...ate/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')> = Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_output=False, cwd='/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0').run
E + where Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_output=False, cwd='/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0') = Process(command=['echo', 'hello'], stream_output=False, cwd='/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')
E + where '/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0' = str(PosixPath('/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0'))
tests/infrastructure/test_process.py:63: AssertionError
----------------------------------------------------------------------------------------------------------------------------- Captured stderr call -----------------------------------------------------------------------------------------------------------------------------
16:48:35.319 | INFO | prefect.infrastructure.process - Opening process...
16:48:35.319 | INFO | prefect.infrastructure.process - Opening process...
16:48:35.319 | DEBUG | prefect.infrastructure.process - Process running command: echo hello in /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0
16:48:35.319 | DEBUG | prefect.infrastructure.process - Process running command: echo hello in /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0
16:48:35.331 | ERROR | prefect.infrastructure.process - Process <AsyncMock name='mock().pid' id='4526856064'> exited with status code: <AsyncMock name='mock().returncode' id='4527051232'>
16:48:35.331 | ERROR | prefect.infrastructure.process - Process <AsyncMock name='mock().pid' id='4526856064'> exited with status code: <AsyncMock name='mock().returncode' id='4527051232'>
------------------------------------------------------------------------------------------------------------------------------ Captured log call -------------------------------------------------------------------------------------------------------------------------------
INFO prefect.infrastructure.process:process.py:76 Opening process...
DEBUG prefect.infrastructure.process:process.py:130 Process running command: echo hello in /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0
ERROR prefect.infrastructure.process:process.py:111 Process <AsyncMock name='mock().pid' id='4526856064'> exited with status code: <AsyncMock name='mock().returncode' id='4527051232'>
=========================================================================================================================== short test summary info ============================================================================================================================
FAILED tests/infrastructure/test_process.py::test_process_runs_command_in_working_dir - assert ProcessResult(identifier="<AsyncMock name='mock().pid' id='4526856064'>", status_code=1)
======================================================================================================================== 1 failed, 18 passed in 22.21s =========================================================================================================================
Zanie
10/17/2022, 10:31 PMmock_open_process
does not set a status codeassert Process.run(…)
def test_process_environment_variables(monkeypatch, mock_open_process):
monkeypatch.setenv("MYVAR", "VALUE")
Process(command=["echo", "hello"], stream_output=False).run()
mock_open_process.assert_awaited_once()
env = mock_open_process.call_args[1].get("env")
assert env == {**os.environ, **Process._base_environment(), "MYVAR": "VALUE"}
Jarvis Stubblefield
10/18/2022, 12:24 AMassert
the run. Can you think of another way I could “check” the process to ensure it’s running from the directory I passed in? What I believe I’m checking is that what I passed in is the same value I expect. I’m not actually checking the CWD of the run itself. I’m not sure I can check more than that?FAILED tests/blocks/test_checksum_consistency.py::test_checksums_are_consistent[process-Process] - AssertionError: process checksum has changed or is not tracked in tests/blocks/checksums.json
FAILED tests/docker/test_registry_pushes.py::test_registry_error - AssertionError: Regex pattern 'lookup.+nowhere' does not match 'Get "<https://nowhere:5678/v2/>": context deadline exceeded'.
FAILED tests/software/test_conda.py::TestCurrentEnvironmentCondaRequirements::test_unmocked_retrieval_succeeds[options0] - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
FAILED tests/software/test_conda.py::TestCurrentEnvironmentCondaRequirements::test_unmocked_retrieval_succeeds[options1] - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
FAILED tests/software/test_conda.py::TestCurrentEnvironmentCondaRequirements::test_unmocked_retrieval_succeeds[options2] - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Zanie
10/18/2022, 3:01 PMbash -c "pwd"
to display the working directoryJarvis Stubblefield
10/18/2022, 8:14 PMZanie
10/18/2022, 8:27 PM