Soham
07/27/2023, 12:46 AMMarvin
07/27/2023, 12:47 AMlog_prints
option to enable the logging of print statements. If you set log_prints=True
for a given task or flow, the Python builtin print statements should be logged.
Here's an example:
python
from prefect import task, flow
@task
def my_task():
print("we're logging print statements from a task")
@flow(log_prints=True)
def my_flow():
print("we're logging print statements from a flow")
In this example, the print statements from both the task and the flow should be logged. By default, tasks and subflows will inherit the log_prints
setting from their parent flow, unless opted out with their own explicit log_prints
setting.
Remember, the logs are your friends. Treat them well, and they'll return the favor when you need them most.Nate
07/27/2023, 12:48 AMSoham
07/27/2023, 12:50 AMNate
07/27/2023, 12:51 AMSoham
07/27/2023, 12:52 AMdef echoflow(processing_stage: str = "DEFAULT", type: str = "TASK"):
def decorator(func=None):
def before_function_call(
gea: Singleton_Echoflow, type: str, processing_stage: str, *args, **kwargs
):
mod_args = []
if type == "FLOW" and processing_stage != "DEFAULT":
run_history = get_last_run_history(name=processing_stage)
print(run_history)
mod_args = [arg for arg in args]
gea.log(
msg=run_history,
extra={"mod_name": func.__module__, "func_name": func.__name__},
level=logging.DEBUG,
)
else:
mod_args = [arg for arg in args]
gea.log(
msg=f"Entering with memory at {gea.log_memory_usage()}: ",
extra={"mod_name": func.__module__, "func_name": func.__name__},
level=logging.DEBUG,
)
return mod_args
def after_function_call(gea: Singleton_Echoflow, *args, **kwargs):
gea.log(
msg=f"Exiting with memory at {gea.log_memory_usage()}: ",
extra={"mod_name": func.__module__, "func_name": func.__name__},
level=logging.DEBUG,
)
@functools.wraps(func)
def wrapper(*args, **kwargs):
gea = Singleton_Echoflow.get_instance()
try:
mod_args = before_function_call(gea, type, processing_stage, *args, **kwargs)
result = func(*mod_args, **kwargs)
after_function_call(gea, *args, **kwargs)
return result
except Exception as e:
gea.log(
msg=e,
level=logging.ERROR,
extra={"mod_name": func.__module__, "func_name": func.__name__},
)
raise e
finally:
# some process
return wrapper
return decorator
@flow(task_runner=DaskTaskRunner)
@echoflow(processing_stage="open-raw", type="FLOW")
def open_raw(
config: Dataset, stage: Stage, data: Union[str, List[List[Dict[str, Any]]]]
):
edList = []
futures = []
raw_dicts_futures = []
i = 0
outputs: List[Output] = []
for raw_dicts in data:
output = Output()
output.passing_params = {"transect":raw_dicts[0].get("transect_num")}
for raw in raw_dicts:
new_processed_raw = process_raw.with_options(task_run_name=raw.get("file_path"), name=raw.get("file_path"))
future = new_processed_raw.submit(
raw,
temp_raw_dir,
config,
stage
)
futures.append(future)
outputs.append(output)
raw_dicts_futures.append(futures)
for futures, output in zip(raw_dicts_futures, outputs):
edList = [f.result() for f in futures]
output.data = edList
return outputs
Decorator :
def echoflow(processing_stage: str = "DEFAULT", type: str = "TASK"):
def decorator(func=None):
def before_function_call(
gea: Singleton_Echoflow, type: str, processing_stage: str, *args, **kwargs
):
mod_args = []
if type == "FLOW" and processing_stage != "DEFAULT":
run_history = get_last_run_history(name=processing_stage)
print(run_history)
mod_args = [arg for arg in args]
gea.log(
msg=run_history,
extra={"mod_name": func.__module__, "func_name": func.__name__},
level=logging.DEBUG,
)
else:
mod_args = [arg for arg in args]
gea.log(
msg=f"Entering with memory at {gea.log_memory_usage()}: ",
extra={"mod_name": func.__module__, "func_name": func.__name__},
level=logging.DEBUG,
)
return mod_args
def after_function_call(gea: Singleton_Echoflow, *args, **kwargs):
gea.log(
msg=f"Exiting with memory at {gea.log_memory_usage()}: ",
extra={"mod_name": func.__module__, "func_name": func.__name__},
level=logging.DEBUG,
)
# rule engine checks - TODO
@functools.wraps(func)
def wrapper(*args, **kwargs):
gea = Singleton_Echoflow.get_instance()
try:
mod_args = before_function_call(gea, type, processing_stage, *args, **kwargs)
result = func(*mod_args, **kwargs)
after_function_call(gea, *args, **kwargs)
return result
except Exception as e:
gea.log(
msg=e,
level=logging.ERROR,
extra={"mod_name": func.__module__, "func_name": func.__name__},
)
raise e
finally:
# some process
return wrapper
return decorator
Nate
07/27/2023, 1:14 AMgea: Singleton_Echoflow
is, and how the .log
method worksPREFECT_LOGGING_EXTRA_LOGGERS
here by setting the module that gea
belongs to like
PREFECT_LOGGING_EXTRA_LOGGERS=<module-name>
Soham
07/27/2023, 1:35 AMdef log(self, msg, level, extra):
self.logger.log(level=level, msg=msg, extra=extra)
config = yaml.safe_load(file.read())
logging.config.dictConfig(config)
Nate
07/27/2023, 1:44 AMPREFECT_LOGGING_EXTRA_LOGGERS
basically scoops any python logger output from the namespace you provide, so if you have a local module gealib
where you're defining your logger, you'd set PREFECT_LOGGING_EXTRA_LOGGERS=gealib
in your env vars (I think)
and I would recommend avoiding decorators that go between @flow
and the function you're decorating, since then you are responsible for writing the decorator in between (echoflow
in this case) in a way that doesn't mess with the @flow
decorators expectations
i'd recommend designing echoflow
to be used like this instead
@echoflow
@flow(...)
def your func():
pass
or is there a reason you can't do it that way?Soham
07/27/2023, 1:48 AM