Asmita
11/17/2021, 11:44 AMAnna Geller
11/17/2021, 12:04 PMAsmita
11/17/2021, 12:32 PMAnna Geller
11/17/2021, 12:42 PMprefect register --project yourproject -p yourflow.py
prefect register --project yourproject -p tests/test_yourflow.py
Asmita
11/17/2021, 12:49 PMAnna Geller
11/17/2021, 12:51 PMAsmita
11/17/2021, 1:07 PMAnna Geller
11/17/2021, 1:09 PMAsmita
11/17/2021, 1:18 PMAnna Geller
11/17/2021, 1:18 PMKevin Kho
11/17/2021, 2:33 PMMilly gupta
11/17/2021, 2:51 PMKevin Kho
11/17/2021, 2:53 PMAsmita
11/17/2021, 3:38 PMimport cdpcommon.prefect.flow_helpers as helpers
from typing import List, Optional
from cdpprefect.flows import minimal_test as minimal_test1
from typing import List, Optional
from unittest import mock
def flow(
register_settings: Optional[helpers.Configuration],
state_handlers: Optional[List],
):
print("state_handler: ", state_handlers)
minimal_test1.dummy2 = mock.Mock(return_value=None)
minimal_test1.flow(register_settings, state_handlers)
flows = helpers.initialise(__file__, flow)
minimal_test.py
This is the test flow that we have mocked from the main flowKevin Kho
11/17/2021, 3:44 PMhelpers.Configuration
and how you define the storage?Asmita
11/17/2021, 3:49 PM"""
This module provides helpers for using flows in the cloud data platform
"""
import importlib
import os
from collections import namedtuple
from pathlib import Path
from typing import Any, Callable, Dict, Tuple, Optional
from cdpsettings.settings import settings
from config import Configuration
from prefect import Flow
from prefect.executors import LocalDaskExecutor
from prefect.run_configs import UniversalRun
from prefect.storage import Module
RUN_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z"
def _get_state_handler_func(handler_path: str):
module_name, func_name = handler_path.rsplit(".", 1)
return getattr(importlib.import_module(f"cdpprefect.{module_name}"), func_name)
def _extract_relative_path(path: str, parent_level: int = 0) -> str:
rel_path_with_os_sep = os.path.relpath(
path, start=Path(os.path.dirname(path)).parents[parent_level]
)
rel_path_with_os_sep_no_suffix = rel_path_with_os_sep.split(".")[0]
return rel_path_with_os_sep_no_suffix.replace(os.sep, ".")
def initialise(module_file: str, flow_function: Callable) -> Tuple[Flow, ...]:
"""Initialise a flow module with settings defined in the CDP settings file.
In almost all cases invoke using
`flows = helpers.initialise(__file__, flow)`
:param module_file: The module file containing the flow
:type module_file: str
:param flow_function: \
A function defining the Flow according to Prefect's functional API
:type flow_function: Callable
:return: A named tuple with a named flow for each configuration defined in settings
:rtype: Tuple[Flow, ...]
"""
prefect_settings: Configuration = settings["prefect"]
print("prefect_settings: ", prefect_settings)
module_name: str = _extract_relative_path(module_file, parent_level=1)
flow_key: str = _extract_relative_path(module_file)
registrations: Configuration = prefect_settings[flow_key]
number_of_workers: int = int(
prefect_settings["local_dask_executor.number_of_workers"]
)
flows_dict: Dict[str, Any] = {}
for r in registrations:
registration = str(r)
flow_name = f"{module_name}:flows.{registration}"
flow_registrations = registrations[registration]
state_handlers = registrations[registration]["state_handlers"]
optional_flow_args = {}
if state_handlers is not None:
optional_flow_args["state_handlers"] = [
_get_state_handler_func(handler_path) for handler_path in state_handlers
]
with Flow(
flow_name,
run_config=UniversalRun(labels=[settings["pipeline.environment_prefix"]]),
storage=Module(flow_name),
executor=LocalDaskExecutor(
scheduler="threads", num_workers=number_of_workers
),
**optional_flow_args,
) as f:
flow_function(
register_settings=flow_registrations["register_settings"],
state_handlers=state_handlers,
)
print("register_settings: ", flow_registrations["register_settings"])
print("state_handlers: ", state_handlers)
flows_dict[registration] = f.copy()
print("flows_dict: ", flows_dict)
print(namedtuple("flows", flows_dict.keys())(*flows_dict.values()))
return namedtuple("flows", flows_dict.keys())(*flows_dict.values())
"flows": {
"minimal_test": {
"minimal_test_name": {
"register_settings": null,
"state_handlers": [
"state_handlers.email_report.report_flow_end_status"
]
}
}
},
"test_flows": {
"minimal_test": {
"minimal_test_name": {
"register_settings": null,
"state_handlers": null
}
}
},
Kevin Kho
11/17/2021, 3:54 PMMilly gupta
11/17/2021, 4:01 PMKevin Kho
11/17/2021, 4:03 PMAsmita
11/17/2021, 4:03 PMKevin Kho
11/17/2021, 4:07 PMMilly gupta
11/17/2021, 4:47 PMKevin Kho
11/17/2021, 5:08 PMAsmita
11/17/2021, 11:33 PMKevin Kho
11/17/2021, 11:42 PMfrom prefect import task, Flow
import prefect
def my_state_handler(Flow, old_state, new_state):
if new_state.is_finished():
<http://prefect.context.logger.info|prefect.context.logger.info>("it ran")
return
@task
def task1():
return 1
with Flow("skip") as flow:
task1()
from unittest.mock import MagicMock
dummy = MagicMock()
flow.state_handlers = [dummy]
flow.run()
Asmita
11/18/2021, 12:38 AMimport cdpcommon.prefect.flow_helpers as helpers
from typing import List, Optional
from cdpprefect.flows import minimal_test as minimal_test1
from typing import List, Optional
from unittest import mock
def flow(
register_settings: Optional[helpers.Configuration],
state_handlers: Optional[List],
):
print("state_handler: ", state_handlers)
minimal_test1.dummy2 = mock.Mock(return_value=None)
minimal_test1.state_handlers = mock.Mock(return_value=None)
minimal_test1.flow(register_settings, state_handlers)
flows = helpers.initialise(__file__, flow)
Kevin Kho
11/18/2021, 12:42 AMminimal_test1
. This issue is unrelated to testing. What is happening is that the agent is still picking up state handlers from cdpprefect.flows
. It will only get state handlers in that same fileAsmita
11/18/2021, 12:45 AMKevin Kho
11/18/2021, 12:50 AMAsmita
11/18/2021, 12:54 AMKevin Kho
11/18/2021, 12:59 AMAsmita
11/18/2021, 1:07 AMwith Flow(
flow_name,
run_config=UniversalRun(labels=[settings["pipeline.environment_prefix"]]),
storage=Module(flow_name),
executor=LocalDaskExecutor(
scheduler="threads", num_workers=number_of_workers
),
**optional_flow_args,
) as f:
flow_function(
register_settings=flow_registrations["register_settings"],
state_handlers=state_handlers,
)
print("register_settings: ", flow_registrations["register_settings"])
print("state_handlers: ", state_handlers)
flows_dict[registration] = f.copy()
print("flows_dict: ", flows_dict)
Kevin Kho
11/18/2021, 1:08 AMclass MyFlow(Flow):
def __init__(self):
if os.environ["test"] == True:
super().__init__(state_handlers=[], executor=LocalDaskExecutor())
else:
super().__init__(state_handlers=[real_state_handler], executor=DaskExecutor())
What this let’s you do then is create 2 agents. One with the test environment variable
prefect agent local start --env test=True
And then you can set the env variable to change the behavior for this flow. If you do:
with MyFlow(...) as flow:
insert_tasks_here
Then this will be evaluated when the Flow is ran, and then the env variable will change wherever the state handler is set. Or you can have LocalExecutor or LocalDaskExecutor.
Will try this more tomorrowimport os
import prefect
from prefect import Flow, Task
from prefect.executors import DaskExecutor, LocalDaskExecutor
from prefect.run_configs import LocalRun
from prefect.storage import Local
prefect.config.logging.level = "DEBUG"
def real_state_handler(obj, old, new):
if new.is_running():
<http://prefect.context.logger.info|prefect.context.logger.info>("I AM RUNNING")
return new
class MyConditionalFlow(Flow):
def __init__(self):
env = os.environ
if 'test' in env:
super().__init__(
name="test",
state_handlers=[],
executor=LocalDaskExecutor(),
storage=Local(path='test2.py', stored_as_script=True),
)
else:
super().__init__(
name="test",
state_handlers=[real_state_handler],
executor=DaskExecutor(),
storage=Local(path='test2.py', stored_as_script=True),
)
with MyConditionalFlow() as flow:
logger = prefect.context.get("logger")
logger.setLevel("DEBUG")
<http://logger.info|logger.info>('hi')
flow.register("bristech")
Asmita
11/24/2021, 12:41 PMAnna Geller
11/24/2021, 12:52 PMAsmita
11/24/2021, 4:21 PMAnna Geller
11/24/2021, 4:24 PMKevin Kho
11/24/2021, 4:29 PMAsmita
08/01/2022, 9:32 AM