https://prefect.io logo
s

Soham

07/27/2023, 12:46 AM
@Marvin: Why are logs on console not printing when we use DaskTaskRunner for a flow?
m

Marvin

07/27/2023, 12:47 AM
Ah, the joy of logs. The silent whispers of your code, often ignored until something goes wrong. Then we scramble to decipher their cryptic messages, hoping they'll reveal the secrets of the latest catastrophe. It's like a tragic love story, really. But I digress. Your issue with the logs not printing when using the DaskTaskRunner might be due to the way logging is handled in Prefect. By default, print statements aren't logged. However, Prefect provides the
log_prints
option to enable the logging of print statements. If you set
log_prints=True
for a given task or flow, the Python builtin print statements should be logged. Here's an example:
Copy code
python
from prefect import task, flow

@task
def my_task():
    print("we're logging print statements from a task")

@flow(log_prints=True)
def my_flow():
    print("we're logging print statements from a flow")
In this example, the print statements from both the task and the flow should be logged. By default, tasks and subflows will inherit the
log_prints
setting from their parent flow, unless opted out with their own explicit
log_prints
setting. Remember, the logs are your friends. Treat them well, and they'll return the favor when you need them most.
n

Nate

07/27/2023, 12:48 AM
are you using get_logger or get_run_logger in the tasks executed with dask?
s

Soham

07/27/2023, 12:50 AM
No, @Nate. I am using the a custom decorator to log start and end of the function to a file. Now, it works well till the point where the flows are sequential. But when I call flow marked with DaskTaskRunner, nothing gets logged ( console or files ).
n

Nate

07/27/2023, 12:51 AM
can i see how the decorator works?
s

Soham

07/27/2023, 12:52 AM
Sure!
Copy code
def echoflow(processing_stage: str = "DEFAULT", type: str = "TASK"):
    def decorator(func=None):

        def before_function_call(
            gea: Singleton_Echoflow, type: str, processing_stage: str, *args, **kwargs
        ):
            mod_args = []
            if type == "FLOW" and processing_stage != "DEFAULT":
                run_history = get_last_run_history(name=processing_stage)
                print(run_history)
                mod_args = [arg for arg in args]
                gea.log(
                    msg=run_history,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                    level=logging.DEBUG,
                )
            else:
                mod_args = [arg for arg in args]

            gea.log(
                msg=f"Entering with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )

            return mod_args

        def after_function_call(gea: Singleton_Echoflow, *args, **kwargs):
            gea.log(
                msg=f"Exiting with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            gea = Singleton_Echoflow.get_instance()
            try:
                mod_args = before_function_call(gea, type, processing_stage, *args, **kwargs)
                
                result = func(*mod_args, **kwargs)
                
                after_function_call(gea, *args, **kwargs)
                return result
            except Exception as e:
                gea.log(
                    msg=e,
                    level=logging.ERROR,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                )
   
                raise e
            finally:
                # some process
        return wrapper

    return decorator
Sure @Nate! Usage :
Copy code
@flow(task_runner=DaskTaskRunner)
@echoflow(processing_stage="open-raw", type="FLOW")
def open_raw(
    config: Dataset, stage: Stage, data: Union[str, List[List[Dict[str, Any]]]]
):
    
    edList = []
    futures = []
    raw_dicts_futures = []
    i = 0
    outputs: List[Output] = []
        for raw_dicts in data:
            output = Output()
            output.passing_params = {"transect":raw_dicts[0].get("transect_num")}
            for raw in raw_dicts:
                
                new_processed_raw = process_raw.with_options(task_run_name=raw.get("file_path"), name=raw.get("file_path"))
                future = new_processed_raw.submit(
                    raw,
                    temp_raw_dir,
                    config,
                    stage
                )
                futures.append(future)
            outputs.append(output)
            raw_dicts_futures.append(futures)

        for futures, output in zip(raw_dicts_futures, outputs):
            edList = [f.result() for f in futures]
            output.data = edList
    return outputs
Decorator :
Copy code
def echoflow(processing_stage: str = "DEFAULT", type: str = "TASK"):
    def decorator(func=None):

        def before_function_call(
            gea: Singleton_Echoflow, type: str, processing_stage: str, *args, **kwargs
        ):
            mod_args = []
            if type == "FLOW" and processing_stage != "DEFAULT":
                run_history = get_last_run_history(name=processing_stage)
                print(run_history)
                mod_args = [arg for arg in args]
                gea.log(
                    msg=run_history,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                    level=logging.DEBUG,
                )
            else:
                mod_args = [arg for arg in args]

            gea.log(
                msg=f"Entering with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )

            return mod_args

        def after_function_call(gea: Singleton_Echoflow, *args, **kwargs):
            gea.log(
                msg=f"Exiting with memory at {gea.log_memory_usage()}: ",
                extra={"mod_name": func.__module__, "func_name": func.__name__},
                level=logging.DEBUG,
            )
# rule engine checks - TODO

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            gea = Singleton_Echoflow.get_instance()
            try:
                mod_args = before_function_call(gea, type, processing_stage, *args, **kwargs)
                
                result = func(*mod_args, **kwargs)
                
                after_function_call(gea, *args, **kwargs)
                return result
            except Exception as e:
                gea.log(
                    msg=e,
                    level=logging.ERROR,
                    extra={"mod_name": func.__module__, "func_name": func.__name__},
                )
   
                raise e
            finally:
                # some process
        return wrapper

    return decorator
n

Nate

07/27/2023, 1:14 AM
ah, it seems it depends on what this thing
gea: Singleton_Echoflow
is, and how the
.log
method works
i believe you may be able to use
PREFECT_LOGGING_EXTRA_LOGGERS
here by setting the module that
gea
belongs to like
Copy code
PREFECT_LOGGING_EXTRA_LOGGERS=<module-name>
s

Soham

07/27/2023, 1:35 AM
@Nate: It is a simple logging method which provides interface and uses the logging package
Copy code
def log(self, msg, level, extra):
        self.logger.log(level=level, msg=msg, extra=extra)
I am creating loggers from yaml file, something like this, will it be problem?
Copy code
config = yaml.safe_load(file.read())
logging.config.dictConfig(config)
Also is the DaskTaskRunner used properly? Since now the flow is not able to create a cluster anymore.
n

Nate

07/27/2023, 1:44 AM
i dont think it really matters how you do the logging, i think
PREFECT_LOGGING_EXTRA_LOGGERS
basically scoops any python logger output from the namespace you provide, so if you have a local module
gealib
where you're defining your logger, you'd set
PREFECT_LOGGING_EXTRA_LOGGERS=gealib
in your env vars (I think) and I would recommend avoiding decorators that go between
@flow
and the function you're decorating, since then you are responsible for writing the decorator in between (
echoflow
in this case) in a way that doesn't mess with the
@flow
decorators expectations i'd recommend designing
echoflow
to be used like this instead
Copy code
@echoflow
@flow(...)
def your func():
   pass
or is there a reason you can't do it that way?
s

Soham

07/27/2023, 1:48 AM
I am using .with_options() before calling the flow. Making @echoflow outside @flow throws error.