I’m trying to contribute to Prefect… I downloaded ...
# prefect-community
j
I’m trying to contribute to Prefect… I downloaded the repository and ran the tests (without modifications). This is the summary of the output I received…
Copy code
10 failed, 4486 passed, 39 skipped, 3 xfailed, 11 errors in 1173.85s (0:19:33)
I wasn’t expecting to have failures… I’ve added my code and am now running the 20 minutes of tests to ensure it’s working as expected… I’m assuming so long as I don’t get to
11 failed
in the summary I should be okay right?
1
z
Thanks for contributing!
Can you share which tests failed?
I’d recommend running a subset of tests targeting the area you’re working on. We count on GitHub actions to run the full suite. You can also run
pytest -n auto
which will run the test suite in parallel using workers for a significant speedup.
j
You’re the one who wrote most(all) of the Process code… I definitely need to run a subset..
I think I can share…
I have definitely broken Process … almost all the tests fail now… lol
O_o I need to re-run the test(s) in tmux so I can gather all of the output.
So … here is my
process.py
file. The change I have made is the definite cause of the errors I’m now seeing for almost all of the
Process
tests.
Copy code
import asyncio
import os
import sys
import tempfile
from typing import Optional, Union

import anyio.abc
import sniffio
from pydantic import Field
from typing_extensions import Literal

from prefect.infrastructure.base import Infrastructure, InfrastructureResult
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.filesystem import tmpchdir
from prefect.utilities.processutils import run_process


def _use_threaded_child_watcher():
    if (
        sys.version_info < (3, 8)
        and sniffio.current_async_library() == "asyncio"
        and sys.platform != "win32"
    ):
        from prefect.utilities.compat import ThreadedChildWatcher

        # Python < 3.8 does not use a `ThreadedChildWatcher` by default which can
        # lead to errors in tests on unix as the previous default `SafeChildWatcher`
        # is not compatible with threaded event loops.
        asyncio.get_event_loop_policy().set_child_watcher(ThreadedChildWatcher())


class Process(Infrastructure):
    """
    Run a command in a new process.

    Current environment variables and Prefect settings will be included in the created
    process. Configured environment variables will override any current environment
    variables.

    Attributes:
        command: A list of strings specifying the command to run in the container to
            start the flow run. In most cases you should not override this.
        env: Environment variables to set for the new process.
        labels: Labels for the process. Labels are for metadata purposes only and
            cannot be attached to the process itself.
        name: A name for the process. For display purposes only.
    """

    _logo_url = "<https://images.ctfassets.net/gm98wzqotmnx/39WQhVu4JK40rZWltGqhuC/d15be6189a0cb95949a6b43df00dcb9b/image5.png?h=250>"

    type: Literal["process"] = Field(
        default="process", description="The type of infrastructure."
    )
    stream_output: bool = Field(
        default=True,
        description="If set, output will be streamed from the process to local standard output.",
    )
    cwd: Union[str, bytes, None] = Field(
        default=None,
        description="If set, the process will open within the specified path as the working directory."
    )  # Needs the "PathLike[str]" definition to fully match anyio.open_process

    @sync_compatible
    async def run(
        self,
        task_status: anyio.abc.TaskStatus = None,
    ) -> Optional[bool]:
        if not self.command:
            raise ValueError("Process cannot be run with empty command.")

        _use_threaded_child_watcher()
        display_name = f" {self.name!r}" if self.name else ""

        # Open a subprocess to execute the flow run
        <http://self.logger.info|self.logger.info>(f"Opening process{display_name}...")
        if self.cwd is None:
            with tempfile.TemporaryDirectory(suffix="prefect") as tmp_dir:
                process = self._create_process(
                    display_name=display_name,
                    task_status=task_status,
                    working_dir=tmp_dir,
                )
        else:
            # We have specified a working directory for the process.
            # NOTE: This will not clean up the working directory like the TemporaryDirectory will.
            with tmpchdir(self.cwd) as tmp_dir:
                process = self._create_process(
                    display_name=display_name,
                    task_status=task_status,
                    working_dir=tmp_dir,
                )

        # Use the pid for display if no name was given
        display_name = display_name or f" {process.pid}"

        if process.returncode:
            help_message = None
            if process.returncode == -9:
                help_message = (
                    "This indicates that the process exited due to a SIGKILL signal. "
                    "Typically, this is caused by high memory usage causing the "
                    "operating system to terminate the process."
                )
            elif process.returncode == 247:
                help_message = (
                    "This indicates that the process was terminated due to high "
                    "memory usage."
                )

            self.logger.error(
                f"Process{display_name} exited with status code: "
                f"{process.returncode}" + (f"; {help_message}" if help_message else "")
            )
        else:
            <http://self.logger.info|self.logger.info>(f"Process{display_name} exited cleanly.")

        return ProcessResult(
            status_code=process.returncode, identifier=str(process.pid)
        )

    def preview(self):
        environment = self._get_environment_variables(include_os_environ=False)
        return " \\\n".join(
            [f"{key}={value}" for key, value in environment.items()]
            + [" ".join(self.command)]
        )

    async def _create_process(self, display_name, task_status, working_dir):
        self.logger.debug(
            f"Process{display_name} running command: {' '.join(self.command)} in {working_dir}"
        )

        return await run_process(
            self.command,
            stream_output=self.stream_output,
            task_status=task_status,
            env=self._get_environment_variables(),
            cwd=working_dir,
        )

    def _get_environment_variables(self, include_os_environ: bool = True):
        os_environ = os.environ if include_os_environ else {}
        # The base environment must override the current environment or
        # the Prefect settings context may not be respected
        env = {**os_environ, **self._base_environment(), **self.env}

        # Drop null values allowing users to "unset" variables
        return {key: value for key, value in env.items() if value is not None}

    def _base_flow_run_command(self):
        return [sys.executable, "-m", "prefect.engine"]


class ProcessResult(InfrastructureResult):
    """Contains information about the final state of a completed process"""
Error Received:
Copy code
# Use the pid for display if no name was given
>       display_name = display_name or f" {process.pid}"
E       AttributeError: 'coroutine' object has no attribute 'pid'

src/prefect/infrastructure/process.py:94: AttributeError
Literally, all of them are reporting that error. I simply placed the code down in an
async def _create_process() method
. I’m guessing that extra layer has somehow messed with it? @Zanie, thoughts?
Okay… I needed to put an
await
in place. That seems to have fixed that… now working to see what else I might need. 😉
z
😄 glad to be of assistance.
j
Okay real quick… I can’t specify
os.PathLike[str]
as one of the types for CWD but the underlying methods allow for it..
I get an error as
pydantic
doesn’t support the above.
The
tmp_path
being passed into the test is a
PosixPath
… which fails validation.
I could make
cwd
without using the
Field
from
pydantic
… I used it for documentation of the description…
This is the output when I added the
"PathLike[str]"
into the `Union[]`…
Copy code
┌─[ballisticpain@BallisticDevelopment] - [~/Development/prefect] - [Mon Oct 17, 16:41]
└─[$] <git:(main*)> pytest tests/infrastructure/test_process.py
ImportError while loading conftest '/Users/ballisticpain/Development/prefect/tests/conftest.py'.
tests/conftest.py:36: in <module>
    import prefect
src/prefect/__init__.py:42: in <module>
    import prefect.infrastructure.process
src/prefect/infrastructure/__init__.py:11: in <module>
    from prefect.infrastructure.process import Process, ProcessResult
src/prefect/infrastructure/process.py:33: in <module>
    class Process(Infrastructure):
pydantic/main.py:198: in pydantic.main.ModelMetaclass.__new__
    ???
pydantic/fields.py:506: in pydantic.fields.ModelField.infer
    ???
pydantic/fields.py:436: in pydantic.fields.ModelField.__init__
    ???
pydantic/fields.py:552: in pydantic.fields.ModelField.prepare
    ???
pydantic/fields.py:663: in pydantic.fields.ModelField._type_analysis
    ???
pydantic/fields.py:808: in pydantic.fields.ModelField._create_sub_type
    ???
pydantic/fields.py:436: in pydantic.fields.ModelField.__init__
    ???
pydantic/fields.py:552: in pydantic.fields.ModelField.prepare
    ???
pydantic/fields.py:755: in pydantic.fields.ModelField._type_analysis
    ???
E   TypeError: Fields of type "<class 'os.PathLike'>" are not supported.
So to get past it for now in my test I’ve simply passed the
tmp_path
wrapped in a
str()
… now I’m trying to figure out why my test isn’t working… here’s the test
Copy code
def test_process_runs_command_in_working_dir(mock_open_process, tmp_path):
    assert Process(command=["echo", "hello"], stream_output=False, cwd=str(tmp_path)).run()
    mock_open_process.assert_awaited_once()
    cwd = mock_open_process.call_args[1].get("cwd")
    assert cwd == str(tmp_path)
Full output…
Copy code
┌─[ballisticpain@BallisticDevelopment] - [~/Development/prefect] - [Mon Oct 17, 16:46]
└─[$] <git:(main*)> pytest tests/infrastructure/test_process.py
============================================================================================================================= test session starts ==============================================================================================================================
platform darwin -- Python 3.10.2, pytest-7.1.3, pluggy-1.0.0
rootdir: /Users/ballisticpain/Development/prefect, configfile: setup.cfg
plugins: anyio-3.6.1, xdist-2.5.0, forked-1.4.0, env-0.6.2, flaky-3.7.0, asyncio-0.19.0, timeout-2.1.0, cov-4.0.0, respx-0.20.0
asyncio: mode=auto
timeout: 60.0s
timeout method: signal
timeout func_only: False
collected 19 items

tests/infrastructure/test_process.py .......F...........                                                                                                                                                                                                                 [100%]

=================================================================================================================================== FAILURES ===================================================================================================================================
___________________________________________________________________________________________________________________ test_process_runs_command_in_working_dir ___________________________________________________________________________________________________________________

mock_open_process = <AsyncMock id='4526772176'>, tmp_path = PosixPath('/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')

    def test_process_runs_command_in_working_dir(mock_open_process, tmp_path):
>       assert Process(command=["echo", "hello"], stream_output=False, cwd=str(tmp_path)).run()
E       assert ProcessResult(identifier="<AsyncMock name='mock().pid' id='4526856064'>", status_code=1)
E        +  where ProcessResult(identifier="<AsyncMock name='mock().pid' id='4526856064'>", status_code=1) = <bound method Process.run of Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_o...ate/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')>()
E        +    where <bound method Process.run of Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_o...ate/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')> = Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_output=False, cwd='/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0').run
E        +      where Process(type='process', env={}, labels={}, name=None, command=['echo', 'hello'], stream_output=False, cwd='/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0') = Process(command=['echo', 'hello'], stream_output=False, cwd='/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0')
E        +        where '/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0' = str(PosixPath('/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0'))

tests/infrastructure/test_process.py:63: AssertionError
----------------------------------------------------------------------------------------------------------------------------- Captured stderr call -----------------------------------------------------------------------------------------------------------------------------
16:48:35.319 | INFO    | prefect.infrastructure.process - Opening process...
16:48:35.319 | INFO    | prefect.infrastructure.process - Opening process...
16:48:35.319 | DEBUG   | prefect.infrastructure.process - Process running command: echo hello in /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0
16:48:35.319 | DEBUG   | prefect.infrastructure.process - Process running command: echo hello in /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0
16:48:35.331 | ERROR   | prefect.infrastructure.process - Process <AsyncMock name='mock().pid' id='4526856064'> exited with status code: <AsyncMock name='mock().returncode' id='4527051232'>
16:48:35.331 | ERROR   | prefect.infrastructure.process - Process <AsyncMock name='mock().pid' id='4526856064'> exited with status code: <AsyncMock name='mock().returncode' id='4527051232'>
------------------------------------------------------------------------------------------------------------------------------ Captured log call -------------------------------------------------------------------------------------------------------------------------------
INFO     prefect.infrastructure.process:process.py:76 Opening process...
DEBUG    prefect.infrastructure.process:process.py:130 Process running command: echo hello in /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/pytest-of-ballisticpain/pytest-4/test_process_runs_command_in_w0
ERROR    prefect.infrastructure.process:process.py:111 Process <AsyncMock name='mock().pid' id='4526856064'> exited with status code: <AsyncMock name='mock().returncode' id='4527051232'>
=========================================================================================================================== short test summary info ============================================================================================================================
FAILED tests/infrastructure/test_process.py::test_process_runs_command_in_working_dir - assert ProcessResult(identifier="<AsyncMock name='mock().pid' id='4526856064'>", status_code=1)
======================================================================================================================== 1 failed, 18 passed in 22.21s =========================================================================================================================
z
The
mock_open_process
does not set a status code
You’ll want to remove the assertion that the run is successful
e.g. this test does not
assert Process.run(…)
Copy code
def test_process_environment_variables(monkeypatch, mock_open_process):
    monkeypatch.setenv("MYVAR", "VALUE")
    Process(command=["echo", "hello"], stream_output=False).run()
    mock_open_process.assert_awaited_once()
    env = mock_open_process.call_args[1].get("env")
    assert env == {**os.environ, **Process._base_environment(), "MYVAR": "VALUE"}
j
Okay … so I didn’t need to
assert
the run. Can you think of another way I could “check” the process to ensure it’s running from the directory I passed in? What I believe I’m checking is that what I passed in is the same value I expect. I’m not actually checking the CWD of the run itself. I’m not sure I can check more than that?
Here are the failed tests now … only 5, and none are what I added. I think this means I can submit a PR.
Copy code
FAILED tests/blocks/test_checksum_consistency.py::test_checksums_are_consistent[process-Process] - AssertionError: process checksum has changed or is not tracked in tests/blocks/checksums.json
FAILED tests/docker/test_registry_pushes.py::test_registry_error - AssertionError: Regex pattern 'lookup.+nowhere' does not match 'Get "<https://nowhere:5678/v2/>": context deadline exceeded'.
FAILED tests/software/test_conda.py::TestCurrentEnvironmentCondaRequirements::test_unmocked_retrieval_succeeds[options0] - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
FAILED tests/software/test_conda.py::TestCurrentEnvironmentCondaRequirements::test_unmocked_retrieval_succeeds[options1] - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
FAILED tests/software/test_conda.py::TestCurrentEnvironmentCondaRequirements::test_unmocked_retrieval_succeeds[options2] - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
z
You could actually run the process instead of mocking it and run
bash -c "pwd"
to display the working directory
🎉 1
It’s better to test the actual behavior where feasible
👍 1
j
I’ll fix that now! Thanks!
I knew there was a better way. 🙂
Sweet… the submission will be coming soon. I think it’s doing as I want it to.
I’m hoping this allows me to remove all of the Django Setup code from each of my flows.
Though I have at least pulled it out into it’s own function so I don’t have to copy and paste it to each flow.
z
😄 Sweet! Looking forward to it