Hi everyone: We are having an issue on running a m...
# prefect-community
a
Hi everyone: We are having an issue on running a mocked flow. We have a flow and for testing the flow, we are mocking a few tasks on the flow and creating a new test flow. The test flow is mocked correctly but when running the test flow, it is using registration_settings of the original flow rather than the new test flow. Would you be able to help? @Milly gupta
a
@Asmita I’m not sure I can follow: do you try to register flows which are unit tests? This documentation page can be helpful if you want to build unit tests for your flows and tasks: https://docs.prefect.io/core/idioms/testing-flows.html#testing-prefect-flows-and-tasks
a
@Anna Geller These are not unit tests but end to end test of the flow run. So we mock a few tasks and create a new flow which is a test flow. So we can test the original flow but by mocking a few tasks. Hopefully this makes sense
a
@Asmita Got it. But do you think you need to register those? Tests usually run locally, or as part of CI/CD, rather than registering and running with a backend. But if you want to register, this should work, too. How do you register those flows? Can you perhaps try using the CLI for this? e.g.:
Copy code
prefect register --project yourproject -p yourflow.py
prefect register --project yourproject -p tests/test_yourflow.py
a
@Anna Geller We do not want to use cli as we have a lot of project settings. If I give you the flow name / flow run, would you be able to see the metadata for that?
a
No, I wouldn’t. But registering with flow.register() should work, too. Perhaps you should configure it differently for the tests to avoid mixup with normal flows? Maybe you can use a different project for that?
a
We are using a different project for test flow and while registering the flows, we can see the register_settings for the new flow are correct. (we do not want to use state_handler for the test flow). But when running the flow, it uses the settings for the original flow. Would I be able to see the settings on the test_flow?
a
Which settings do you mean exactly? Can you share your flow definition?
a
Can you share a screenshot instead of URL? I think that sharing your flow code that runs with incorrect settings would be helpful.
you can either try build a minimal example I can reproduce, or if you don’t want to share publicly, you can send me the flow definition via DM and we can still continue the discussion here
k
What storage did you use?
m
@Kevin Kho We use Module storage
k
Yeah I think we’d need to see the code or a minimal example of what you are doing
upvote 1
a
Copy code
import cdpcommon.prefect.flow_helpers as helpers
from typing import List, Optional
from cdpprefect.flows import minimal_test as minimal_test1
from typing import List, Optional
from unittest import mock


def flow(
    register_settings: Optional[helpers.Configuration],
    state_handlers: Optional[List],
):
    print("state_handler: ", state_handlers)
    minimal_test1.dummy2 = mock.Mock(return_value=None)
    minimal_test1.flow(register_settings, state_handlers)


flows = helpers.initialise(__file__, flow)
minimal_test.py This is the test flow that we have mocked from the main flow
The main flow has state_handlers set whereas the test flow has state_handlers set to null
But when running the test_flow, at the end, it executes the state_handler as well
k
I think I am beginning to understand. Can I see an example for
helpers.Configuration
and how you define the storage?
a
Copy code
"""
This module provides helpers for using flows in the cloud data platform
"""
import importlib
import os
from collections import namedtuple
from pathlib import Path
from typing import Any, Callable, Dict, Tuple, Optional

from cdpsettings.settings import settings
from config import Configuration
from prefect import Flow
from prefect.executors import LocalDaskExecutor
from prefect.run_configs import UniversalRun
from prefect.storage import Module

RUN_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z"


def _get_state_handler_func(handler_path: str):
    module_name, func_name = handler_path.rsplit(".", 1)
    return getattr(importlib.import_module(f"cdpprefect.{module_name}"), func_name)


def _extract_relative_path(path: str, parent_level: int = 0) -> str:
    rel_path_with_os_sep = os.path.relpath(
        path, start=Path(os.path.dirname(path)).parents[parent_level]
    )
    rel_path_with_os_sep_no_suffix = rel_path_with_os_sep.split(".")[0]
    return rel_path_with_os_sep_no_suffix.replace(os.sep, ".")


def initialise(module_file: str, flow_function: Callable) -> Tuple[Flow, ...]:
    """Initialise a flow module with settings defined in the CDP settings file.
    In almost all cases invoke using
    `flows = helpers.initialise(__file__, flow)`

    :param module_file: The module file containing the flow
    :type module_file: str
    :param flow_function: \
        A function defining the Flow according to Prefect's functional API
    :type flow_function: Callable
    :return: A named tuple with a named flow for each configuration defined in settings
    :rtype: Tuple[Flow, ...]
    """
    prefect_settings: Configuration = settings["prefect"]
    print("prefect_settings: ", prefect_settings)
    module_name: str = _extract_relative_path(module_file, parent_level=1)
    flow_key: str = _extract_relative_path(module_file)
    registrations: Configuration = prefect_settings[flow_key]
    number_of_workers: int = int(
        prefect_settings["local_dask_executor.number_of_workers"]
    )

    flows_dict: Dict[str, Any] = {}

    for r in registrations:
        registration = str(r)
        flow_name = f"{module_name}:flows.{registration}"
        flow_registrations = registrations[registration]
        state_handlers = registrations[registration]["state_handlers"]
        optional_flow_args = {}
        if state_handlers is not None:
            optional_flow_args["state_handlers"] = [
                _get_state_handler_func(handler_path) for handler_path in state_handlers
            ]
        with Flow(
            flow_name,
            run_config=UniversalRun(labels=[settings["pipeline.environment_prefix"]]),
            storage=Module(flow_name),
            executor=LocalDaskExecutor(
                scheduler="threads", num_workers=number_of_workers
            ),
            **optional_flow_args,
        ) as f:
            flow_function(
                register_settings=flow_registrations["register_settings"],
                state_handlers=state_handlers,
            )
            print("register_settings: ", flow_registrations["register_settings"])
            print("state_handlers: ", state_handlers)
            flows_dict[registration] = f.copy()
            print("flows_dict: ", flows_dict)

    print(namedtuple("flows", flows_dict.keys())(*flows_dict.values()))
    return namedtuple("flows", flows_dict.keys())(*flows_dict.values())
Configurations:
Copy code
"flows": {
			"minimal_test": {
				"minimal_test_name": {
					"register_settings": null,
					"state_handlers": [
						"state_handlers.email_report.report_flow_end_status"
					]
				}
			}
		},
		"test_flows": {
			"minimal_test": {
				"minimal_test_name": {
					"register_settings": null,
					"state_handlers": null
				}
			}
		},
k
Ok i think I understand what is happening. So when you register a flow, the flow metadata gets serialized and sent to Prefect Cloud. The metadata that is sent can be found here. Take a look at what is saved: parameters, run_config, storage, schedule… Whatever is not here is retrieved during runtime from the Flow definition in storage. So for example is an executor. there is no executor here, so during runtime, it will go to the Flow file and run it and determine the executor based on that. The same for the state handlers. So if you have two files A and B and B imports a Flow from A, adds an executor, and then registers it (but the storage points to file A), that executor is not stored anywhere. When the flow gets run, the executor will be whatever is in file A. For these configurations to apply, they need to be defined in the same file as the Flow.
m
@Kevin Kho Can we mock statehandler?
k
Are you thinking that you don’t want a statehandler to trigger in a test run so you want to mock the behavior so that nothing happens?
a
yes
k
You should be able to since it is a general Python function.
Actually maybe not because you need to patch it where it is being used and that’s in the FlowRunner (I think). Will ask the team to be sure.
m
Thanks @Kevin Kho That would be great if you can find out please. We are kind of blocked at the moment
and do you have any examples how we can test flow end to end and mock some tasks in the flow?
k
So the best example I’ve seen is this. I haven’t gotten confirmation, but I think it’s more likely you can’t mock the state handler
So you need to pass the Mocked state handler to your Flow state handler
a
Would you be able to give an example for passing mocked state handler to the Flow state handler?
k
You can just pass MagicMock
Copy code
from prefect import task, Flow
import prefect

def my_state_handler(Flow, old_state, new_state):
    if new_state.is_finished():
        <http://prefect.context.logger.info|prefect.context.logger.info>("it ran")
    return

@task
def task1():
    return 1

with Flow("skip") as flow:
    task1()


from unittest.mock import MagicMock
dummy = MagicMock()
flow.state_handlers = [dummy]
flow.run()
a
I am doing this while registering the flow but it doesn't help
Copy code
import cdpcommon.prefect.flow_helpers as helpers
from typing import List, Optional
from cdpprefect.flows import minimal_test as minimal_test1
from typing import List, Optional
from unittest import mock


def flow(
    register_settings: Optional[helpers.Configuration],
    state_handlers: Optional[List],
):
    print("state_handler: ", state_handlers)
    minimal_test1.dummy2 = mock.Mock(return_value=None)
    minimal_test1.state_handlers = mock.Mock(return_value=None)
    minimal_test1.flow(register_settings, state_handlers)


flows = helpers.initialise(__file__, flow)
k
It won’t because the registration here is still pointing to the file/module that contains
minimal_test1
. This issue is unrelated to testing. What is happening is that the agent is still picking up state handlers from
cdpprefect.flows
. It will only get state handlers in that same file
Because state handlers are not part of the registration. They are just taken when the Flow is loaded from storage
a
Can I get the flow from flow_id and mock state_handler on the flow before create_flow_run?
k
What are you trying to do? Given a flow_id, you want to load the Flow object, attach the mock at run time before it starts? This would require editing the agent code.
What is the end goal your are trying to do?
upvote 1
a
We are trying to mock the main flow and the main flow has email state_handler registered and for the test flow we don't want to trigger the email
k
Do you use flow.register() for the test or flow.run()?
Does test mean run the flow end to end?
a
For registering the flow, we use Flow class as below
Copy code
with Flow(
            flow_name,
            run_config=UniversalRun(labels=[settings["pipeline.environment_prefix"]]),
            storage=Module(flow_name),
            executor=LocalDaskExecutor(
                scheduler="threads", num_workers=number_of_workers
            ),
            **optional_flow_args,
        ) as f:
            flow_function(
                register_settings=flow_registrations["register_settings"],
                state_handlers=state_handlers,
            )
            print("register_settings: ", flow_registrations["register_settings"])
            print("state_handlers: ", state_handlers)
            flows_dict[registration] = f.copy()
            print("flows_dict: ", flows_dict)
For running the flow, we use client.create_flow_run
k
Ok will think about how to go about this and respond a bit later
👍 1
So I don’t think there is a way for you to get that Flow as it is about to start and edit it. At least without going into the agent code. In order, the agent will find a flow, load the run config (from the database), load it from storage, and then find the Flow in that storage before running it on the RunConfig. This all happens on the agent code and there are really no hooks to change that Flow object that is loaded. So with regards to testing, I have seen this setup which I think resembles what you want to do. You can subclass the Flow object and make the behavior change from an environment variable (or maybe the Prefect context). This is not working code but to give an idea:
Copy code
class MyFlow(Flow):

    def __init__(self):
        if os.environ["test"] == True:
            super().__init__(state_handlers=[], executor=LocalDaskExecutor())
        else:
            super().__init__(state_handlers=[real_state_handler], executor=DaskExecutor())
What this let’s you do then is create 2 agents. One with the test environment variable
Copy code
prefect agent local start --env test=True
And then you can set the env variable to change the behavior for this flow. If you do:
Copy code
with MyFlow(...) as flow:
    insert_tasks_here
Then this will be evaluated when the Flow is ran, and then the env variable will change wherever the state handler is set. Or you can have LocalExecutor or LocalDaskExecutor. Will try this more tomorrow
This is a fully working version:
Copy code
import os
import prefect
from prefect import Flow, Task
from prefect.executors import DaskExecutor, LocalDaskExecutor
from prefect.run_configs import LocalRun
from prefect.storage import Local
prefect.config.logging.level = "DEBUG"

def real_state_handler(obj, old, new):
    if new.is_running():
        <http://prefect.context.logger.info|prefect.context.logger.info>("I AM RUNNING")
    return new

class MyConditionalFlow(Flow):
    def __init__(self):
        env = os.environ
        if 'test' in env:
            super().__init__(
                name="test",
                state_handlers=[],
                executor=LocalDaskExecutor(),
                storage=Local(path='test2.py', stored_as_script=True),
        )
        else:
            super().__init__(
                name="test",
                state_handlers=[real_state_handler],
                executor=DaskExecutor(),
                storage=Local(path='test2.py', stored_as_script=True),
            )
with MyConditionalFlow() as flow:
    logger = prefect.context.get("logger")
    logger.setLevel("DEBUG")
    <http://logger.info|logger.info>('hi')
    
flow.register("bristech")
a
Hi Kevin...looking into this but we will need to create different agents for test and actual flow. We just have a single agent and would use different projects. One being a test project and other the actual project. Can we do anythign with different projects rather than agents?
a
@Asmita projects are used to organize flows and they are relevant during the registration. You can definitely have one flow in a test project and another in prod project.
a
We do have flows in different projects but needed to know if we can use this property to change state handlers rather than different agents
a
not sure if I understand the question - can you explain? Your state handler should work the same way regardless under which project or agent your flow is running
k
I don’t think you can because nothing about the project is in the context of the flow during execution already so you can not modify flow behavior by using projects for sure.
But if the goal is dev-staging-prod separation, it would normally imply one agent per env. You can have two agents on the same machine.
🙏 1
a
Hi everyone: We are having issues where agent is not picking up the flows. We can see that the agent is running with the same label as the flow but the flow is still in scheduled status. This was working up until saturday but has suddenly stopped. Would you be able to help here? @Milly gupta