<@ULVA73B9P>: Why are logs on console not printing...
# ask-marvin
s
@Marvin: Why are logs on console not printing when we use DaskTaskRunner for a flow?
m
Ah, the joy of logs. The silent whispers of your code, often ignored until something goes wrong. Then we scramble to decipher their cryptic messages, hoping they'll reveal the secrets of the latest catastrophe. It's like a tragic love story, really. But I digress. Your issue with the logs not printing when using the DaskTaskRunner might be due to the way logging is handled in Prefect. By default, print statements aren't logged. However, Prefect provides the
log_prints
option to enable the logging of print statements. If you set
log_prints=True
for a given task or flow, the Python builtin print statements should be logged. Here's an example:
Copy code
python
from prefect import task, flow

@task
def my_task():
    print("we're logging print statements from a task")

@flow(log_prints=True)
def my_flow():
    print("we're logging print statements from a flow")
In this example, the print statements from both the task and the flow should be logged. By default, tasks and subflows will inherit the
log_prints
setting from their parent flow, unless opted out with their own explicit
log_prints
setting. Remember, the logs are your friends. Treat them well, and they'll return the favor when you need them most.
n
are you using get_logger or get_run_logger in the tasks executed with dask?
s
No, @Nate. I am using the a custom decorator to log start and end of the function to a file. Now, it works well till the point where the flows are sequential. But when I call flow marked with DaskTaskRunner, nothing gets logged ( console or files ).
n
can i see how the decorator works?
s
Sure!
Copy code
def echoflow(processing_stage: str = "DEFAULT", type: str = "TASK"):
    def decorator(func=None):

        def before_function_call(
            gea: Singleton_Echoflow, type: str, processing_stage: str, *args, **kwargs
        ):
            mod_args = []
            if type == "FLOW" and processing_stage != "DEFAULT":
                run_history = get_last_run_history(name=processing_stage)
                print(run_history)
                mod_args = [arg for arg in args]
                gea.log(
                    msg=run_history,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                    level=logging.DEBUG,
                )
            else:
                mod_args = [arg for arg in args]

            gea.log(
                msg=f"Entering with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )

            return mod_args

        def after_function_call(gea: Singleton_Echoflow, *args, **kwargs):
            gea.log(
                msg=f"Exiting with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            gea = Singleton_Echoflow.get_instance()
            try:
                mod_args = before_function_call(gea, type, processing_stage, *args, **kwargs)
                
                result = func(*mod_args, **kwargs)
                
                after_function_call(gea, *args, **kwargs)
                return result
            except Exception as e:
                gea.log(
                    msg=e,
                    level=logging.ERROR,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                )
   
                raise e
            finally:
                # some process
        return wrapper

    return decorator
Sure @Nate! Usage :
Copy code
@flow(task_runner=DaskTaskRunner)
@echoflow(processing_stage="open-raw", type="FLOW")
def open_raw(
    config: Dataset, stage: Stage, data: Union[str, List[List[Dict[str, Any]]]]
):
    
    edList = []
    futures = []
    raw_dicts_futures = []
    i = 0
    outputs: List[Output] = []
        for raw_dicts in data:
            output = Output()
            output.passing_params = {"transect":raw_dicts[0].get("transect_num")}
            for raw in raw_dicts:
                
                new_processed_raw = process_raw.with_options(task_run_name=raw.get("file_path"), name=raw.get("file_path"))
                future = new_processed_raw.submit(
                    raw,
                    temp_raw_dir,
                    config,
                    stage
                )
                futures.append(future)
            outputs.append(output)
            raw_dicts_futures.append(futures)

        for futures, output in zip(raw_dicts_futures, outputs):
            edList = [f.result() for f in futures]
            output.data = edList
    return outputs
Decorator :
Copy code
def echoflow(processing_stage: str = "DEFAULT", type: str = "TASK"):
    def decorator(func=None):

        def before_function_call(
            gea: Singleton_Echoflow, type: str, processing_stage: str, *args, **kwargs
        ):
            mod_args = []
            if type == "FLOW" and processing_stage != "DEFAULT":
                run_history = get_last_run_history(name=processing_stage)
                print(run_history)
                mod_args = [arg for arg in args]
                gea.log(
                    msg=run_history,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                    level=logging.DEBUG,
                )
            else:
                mod_args = [arg for arg in args]

            gea.log(
                msg=f"Entering with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )

            return mod_args

        def after_function_call(gea: Singleton_Echoflow, *args, **kwargs):
            gea.log(
                msg=f"Exiting with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )
# rule engine checks - TODO

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            gea = Singleton_Echoflow.get_instance()
            try:
                mod_args = before_function_call(gea, type, processing_stage, *args, **kwargs)
                
                result = func(*mod_args, **kwargs)
                
                after_function_call(gea, *args, **kwargs)
                return result
            except Exception as e:
                gea.log(
                    msg=e,
                    level=logging.ERROR,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                )
   
                raise e
            finally:
                # some process
        return wrapper

    return decorator
n
ah, it seems it depends on what this thing
gea: Singleton_Echoflow
is, and how the
.log
method works
i believe you may be able to use
PREFECT_LOGGING_EXTRA_LOGGERS
here by setting the module that
gea
belongs to like
Copy code
PREFECT_LOGGING_EXTRA_LOGGERS=<module-name>
s
@Nate: It is a simple logging method which provides interface and uses the logging package
Copy code
def log(self, msg, level, extra):
        self.logger.log(level=level, msg=msg, extra=extra)
I am creating loggers from yaml file, something like this, will it be problem?
Copy code
config = yaml.safe_load(file.read())
logging.config.dictConfig(config)
Also is the DaskTaskRunner used properly? Since now the flow is not able to create a cluster anymore.
n
i dont think it really matters how you do the logging, i think
PREFECT_LOGGING_EXTRA_LOGGERS
basically scoops any python logger output from the namespace you provide, so if you have a local module
gealib
where you're defining your logger, you'd set
PREFECT_LOGGING_EXTRA_LOGGERS=gealib
in your env vars (I think) and I would recommend avoiding decorators that go between
@flow
and the function you're decorating, since then you are responsible for writing the decorator in between (
echoflow
in this case) in a way that doesn't mess with the
@flow
decorators expectations i'd recommend designing
echoflow
to be used like this instead
Copy code
@echoflow
@flow(...)
def your func():
   pass
or is there a reason you can't do it that way?
s
I am using .with_options() before calling the flow. Making @echoflow outside @flow throws error.