I use mlflow with prefect, and I run an mlflow ex...
# ask-community
m
I use mlflow with prefect, and I run an mlflow experiment inner a prefect task, I would how can I get logging from the mlflow run experiment to the prefect UI? I juste see in the documentation how log in the task cde, but not among a fonction inner the task there my task :
Copy code
@task(log_stdout=True)
def run_mlflow(project_path, experiment):
    mlflow.projects.run(
        project_path, experiment_name=experiment,
    )
and there my mlflow experiment where I would put prefect logging :
Copy code
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)

# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):

    set_env(cfg)

    get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)

    train = mlflow.run(project_path, "train", experiment_name=experiment)
the logging from print() appear just in the task and not in the mlflow experiment Thank you in advance
m
As I say it only work in prefect task but not in the function among the task code there the task in flow.py
Copy code
@task()
def run_mlflow(project_path, experiment):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("------------An info message.------------------")
    mlflow.projects.run(
        project_path, experiment_name=experiment,
    )
there main.py, the script run when I set mlflow.project.run()
Copy code
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)

# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("------------An MLFLOW RUN ------------------")
    set_env(cfg)

    get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)

    train = mlflow.run(project_path, "train", experiment_name=experiment)
here the output logs, as you can notice, only log directly in the task is related
mlflow has it's own logs, could I set mlflow log with the extralogger method ?
z
Yep, if it's using the python standard logger
upvote 1
m
I had edited my main.py accorded with the docu and set the env var
Copy code
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)

# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):

    set_env(cfg)
    for l in ["mlflow", "snowflake.connector", "boto3", "custom_lib"]:
        logger = logging.getLogger(l)
        logger.setLevel("INFO")
        log_stream = logging.StreamHandler(sys.stdout)
        LOG_FORMAT = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        log_stream.setFormatter(LOG_FORMAT)
        logger.addHandler(log_stream)
        <http://logger.info|logger.info>("---------test------------")
    <http://logger.info|logger.info>("-----------test2-----------")

    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("------------An MLFLOW RUN ------------------")
    get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)

    train = mlflow.run(project_path, "train", experiment_name=experiment)
But I didn't have any logs nevertheless I don't think mlfow is using standart logger, its own logs are, in fact, log for saving artifact, mlflow don't have properly log, anyway , even so is it possible to use generic logging and set in prefect without specify library, like passe an generic log no matter where it is execute ?
k
The MLFlow source code looks like it’s a Python logger so it should work. How did you set the environment variable? It needs to be set before importing Prefect so in your case, it would be:
Copy code
export PREFECT__LOGGING__EXTRA_LOGGERS="['mlfow']"
or in the
config.toml
Copy code
[logging]
# Extra loggers for Prefect log configuration
extra_loggers = "['mlflow']"
This should be done in the place that it running the Flow. If you are registering and running this flow, it should be set on the agent.
m
I will try to do it , set env var after importing prefect it would have been the problem
I set var env in sh file when my conda venv is activate, to be sure I restarted prefect serveur echo
Copy code
PREFECT__LOGGING__EXTRA_LOGGERS
then I check the log and I still don't have any mlflow log
there my main.py new file :
Copy code
import hydra
from omegaconf import DictConfig, OmegaConf
import os

import uuid

# @hydra.main(config_path="conf", config_name="config")
from hydra import compose, initialize
from omegaconf import OmegaConf


# print(os.environ)
# print(subprocess.run(["ls"]))
import os
import warnings
import sys
from funcy.colls import project
import mlflow
import subprocess
import prefect
import logging
import sys


def set_env(cfg: DictConfig) -> None:
    """
    set all env var
    """
    env = cfg["var"]
    for k, v in env.items():
        os.environ[k] = v


initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)
l = "mlflow"
logger = logging.getLogger(l)
logger.setLevel("INFO")
log_stream = logging.StreamHandler(sys.stdout)
LOG_FORMAT = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log_stream.setFormatter(LOG_FORMAT)
logger.addHandler(log_stream)
# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):

    set_env(cfg)

    <http://logger.info|logger.info>("---------test------------")

    <http://logger.info|logger.info>("-----------test2-----------")

    # logger = prefect.context.get("logger")
    # <http://logger.info|logger.info>("------------An MLFLOW RUN ------------------")
    get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)

    train = mlflow.run(project_path, "train", experiment_name=experiment)
here my new flow file :
Copy code
import mlflow
from mlflow import projects
import os
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.run_configs import LocalRun
import prefect
from prefect import task, Flow, Parameter
import subprocess
from prefect import Client
import hydra
from omegaconf import DictConfig, OmegaConf
from dataclasses import dataclass
from omegaconf import DictConfig, OmegaConf
from hydra.core.config_store import ConfigStore
import yaml
import logging
import sys

# os.environ["AWS_SECRET_ACCESS_KEY"] = "admin1598753"
# os.environ["AWS_ACCESS_KEY_ID"] = "adminminio"
# os.environ["MLFLOW_S3_ENDPOINT_URL"] = "<http://localhost:3001>"
# os.environ["MLFLOW_S3_IGNORE_TLS"] = "true"
# os.environ["MLFLOW_TRACKING_URI"] = "localhost:3000"
# print("os")
# project_path = "./project"
# experiment = "gojob"
# traking = "<http://localhost:5000>"
# params = {}


@task
def set_env(cfg: DictConfig) -> None:
    """
    set all env var
    """
    env = cfg["var"]
    for k, v in env.items():
        os.environ[k] = v


@task
def set_uri(traking):
    mlflow.set_tracking_uri(traking)
    return traking


@task(log_stdout=True)
def set_exp(experiment):
    print("log set experiment ")
    mlflow.set_experiment(experiment)
    return experiment


@task()
def run_mlflow(project_path, experiment):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("------------An info message.------------------")
    l = "mlflow"
    logger = logging.getLogger(l)
    logger.setLevel("INFO")
    log_stream = logging.StreamHandler(sys.stdout)
    LOG_FORMAT = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    log_stream.setFormatter(LOG_FORMAT)
    logger.addHandler(log_stream)
    mlflow.projects.run(
        project_path, experiment_name=experiment,
    )


print("experiment setted")


@dataclass
class Run:
    flow_id: str = "None"


@hydra.main(config_path="project/conf", config_name="config")
def workflow(cfg: DictConfig):
    with Flow("gojobflow", run_config=LocalRun()) as flow:
        logger = prefect.context.get("logger")
        logger = prefect.utilities.logging.get_logger()
        experiment = cfg["experiment"]
        tracking = cfg["var"]["MLFLOW_TRACKING_URI"]
        project_path = cfg["project_path"]

        v = set_env(cfg)
        s = set_uri(tracking)
        e = set_exp(experiment)
        r = run_mlflow(project_path, e)
    try:
        idf = flow.register(project_name="gojob", set_schedule_active=False)
        run_1 = {"flow_id": idf}
        <http://logger.info|logger.info>(cfg["project_path"] + "/conf/run/run_1.yaml")
        with open(cfg["project_path"] + "/conf/run/run_1.yaml", "w+") as outfile:
            yaml.dump(run_1, outfile, default_flow_style=False)
    except:
        subprocess.run(["prefect", "create", "project", "gojob"])
        idf = flow.register(project_name="gojob", set_schedule_active=False)
        run_1 = {"flow_id": idf}
        <http://logger.info|logger.info>(cfg["project_path"] + "/conf/run/run_1.yaml")
        with open(cfg["project_path"] + "/conf/run/run_1.yaml", "w+") as outfile:
            yaml.dump(run_1, outfile, default_flow_style=False)

    # ri = Run(flow_id=idf)
    # Registering the Config class with the name 'config'.
    # cs.store(group="run", name="run_1", node=ri)
    # print(OmegaConf.to_yaml(cfg))


if __name__ == "__main__":
    workflow()
Is there a way to set regular logging with the logging library in prefect server ? I could solve my problem, I did'nt see anything about that in the doc. But if it possible to set external library log which use logging regular library it should be possible to set directly regular logging ?
a
@m I think you are doing too much 🙂 if I’m not mistaken, if you follow the advice from Kevin here, and set the extra logger either as env variable or in the config.toml, that’s all. You don’t need to configure it any more in your flow file. You only need to get prefect logger only once in a task:
Copy code
logger = prefect.context.get("logger")
I see that sometimes you defined it, but then you redeclared the logger object with different configuration, as e.g. here:
Copy code
logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("------------An info message.------------------")
    l = "mlflow"
    logger = logging.getLogger(l)
    logger.setLevel("INFO")
    log_stream = logging.StreamHandler(sys.stdout)
    LOG_FORMAT = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    log_stream.setFormatter(LOG_FORMAT)
    logger.addHandler(log_stream)
so you can replace this entire block by only this line:
Copy code
logger = prefect.context.get("logger")
Can you try that and report back?
Not sure how MlFlow logging works, but I know that e.g. with boto3, if I set those 2 variables on both my agent and the machine from which I register the flow:
Copy code
export PREFECT__LOGGING__EXTRA_LOGGERS="['boto3', 'botocore']"
export PREFECT__LOGGING__LEVEL="DEBUG"
Now, when I use boto3 in my flow code and set the level to DEBUG explicitly, then all of boto3 debug logs like response headers etc. become available in the logs. Here is an example:
Copy code
from prefect import task, Flow
import boto3
import pandas as pd
import prefect
import logging

@task
def get_s3_data():
    logging.getLogger("boto3").setLevel(logging.DEBUG)
    logging.getLogger("botocore").setLevel(logging.DEBUG)
    prefect_logger = prefect.context.get("logger")
    s3 = boto3.client("s3")
    obj = s3.get_object(
        Bucket="data-lake-bronze",
        Key="global_power_plant_dataset/global_power_plant_database.csv",
    )
    df = pd.read_csv(obj["Body"])
    <http://prefect_logger.info|prefect_logger.info>("Read df: %s", df.head(2))

with Flow("log_test",) as flow:
    get_s3_data()
Then, I register the flow and run it, and here are the logs:
m
yes but it is working for me too, when I set log among a task code
but I need to trace logging in an external file which is execute during a python subprocess inner the task
log are outside the definition of the flow
but this file is still executed during the task so I need to trace it
k
You are saying there is a
task_a
that creates some subprocess and the subprocess has logs that Prefect needs to see? I think this is dependent now on the subprocess. Is that a Python subprocess and how are you starting it?
m
in fact I use mlflow with prefect en I run an mlflow experiment in my task, I need to log it for debugging my file, for now i'm forced to debug outside prefect and I don't have any monitoring in what happen during my task if log is not writing directly inside
it is mean that we can develop anything outside the workflow, like importing some function from other file ,set log inside and monitor it in prefect
It is really limited as functionality whereas logging is top priority when we build workflow
it could be convenient to set params as run id and task id to trace what is running during a task , outside of its code
maybe if i passe prefect.context.get("logger") from task as artifact in my subprocess script it could working ?
k
I read the MLFlow code of the of
mlflow.projects.run
, the problem is that I am not seeing anything they expose to send logs to the standard Python logger. I think we can help you if you can give us a code snippet how to collect those logs if you were not using Prefect, and then we can find how to do it in Prefect. I think what you are suggesting is that Prefect exposes a way for other processes to send logs to the API if you provide the
flow_run_id
or something. The problem is the the Prefect logger is heavily tied to
context
, which is created during the Flow run because Prefect Cloud creates the metadata when the Flow runs. So in the source code, you will see it instantiates this handler from the context. I don’t know for sure how MLFlow does it, but think about it this way, if Prefect kicks off a Databricks job, there is a completely separate Python process that does not know about Prefect. You would need to be able to pass the `context`to the subprocess, create the
CloudHandler
and then attach it to your Python logger of that subprocess. It might be possible, but you need to be able to accept the
context
somehow. If your subprocess is still attached to the parent process and you can pass context easily, I think the best thing to try is to pass
logger
into the subprocess and then try doing
<http://logger.info|logger.info>(…)
like you are suggesting. If that doesn’t work, you can try passing context, and then creating
CloudHandler
, and then attaching
CloudHandler
to your log there. This snippet might help in showing how to attach a handler.
m
wow think you for this awesome answer, I will experiment it tomorrow, and give here a feedback
ok so it doesn't working to pass pickled logger in the subprocess with redis server
my flow code :
Copy code
import redis
import cloudpickle
import pickle
import prefect
from prefect import task, Flow, Parameter
import subprocess

# multi key usage


import prefect
from prefect import task, Flow
import cloudpickle


@task
def set_print():
    r = redis.Redis(host="localhost", port=6379, db=0)
    r.set("foo", "bar")
    print("regular key ", r.get("foo"))


@task
def redis_dict():
    user = {
        "Name": "Pradeep",
        "Company": "SCTL",
        "Address": "Mumbai",
        "Location": "RCP",
    }
    r = redis.Redis(host="localhost", port=6379, db=0)
    r.hmset("pythonDict", user)

    print(r.hgetall("pythonDict"))


@task
def hello_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello world!")
    r = redis.Redis(host="localhost", port=6379, db=0)
    r.set("logger", cloudpickle.dumps(logger))
    subprocess.run(["python", "task_subprocess.py"])


with Flow("hello-flow") as flow:
    hello_task()

flow.run()
here the task_subprocess.py code
Copy code
import pickle
import redis
print('start')
r = redis.Redis(host="localhost", port=6379, db=0)

logger = pickle.loads(r.get("logger"))
<http://logger.info|logger.info>("It is working!")
print("runned")
the output
k
You can’t pass the native Python logger with
cloudpickle
or
pickle
because they lose their configuration when they are
unpickled
. But in this case, you might be able to just use the
print("runned")
, and then on the task side, you can do:
Copy code
@task(log_stdout=True)
def hello_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello world!")
    subprocess.run(["python", "task_subprocess.py"])
And this might capture
print("runned")
because the task will capture
stdout
m
okey so it is working in local
but not when I run flow with agent on prefect server
so I will try your method with cloud handler
k
I see. I guess it doesn’t work because that
stdout
is from another process. Yes you may have to attach the CloudHandler somehow and pass the context to point that subprocess logging to Prefect Cloud
m
@Kevin Kho,
I got this error
State Message: Error during execution of task: TypeError('Pickling context objects is explicitly not supported. You should always access context as an attribute of the
prefect
module, as in `prefect.context`')
I try to pickling prefect context to pass it in the subprocess script
Copy code
@task(log_stdout=True)
def run_mlflow(project_path, experiment):
    print("mlflowrun")
    logger = prefect.context
    r = redis.Redis(host="localhost", port=6379, db=0)
    print(" pinckling ")
    r.set("context", cloudpickle.dumps(logger))
    print( "" )
    subprocess.run(["python", "redis/task_subprocess.py"])
    mlflow.projects.run(
        project_path,
        experiment_name=experiment,
    )
and my subprocess :
Copy code
import pickle
import redis
from prefect.utilities import logging

print("start")
handler = logging.CloudHandler()
r = redis.Redis(host="localhost", port=6379, db=0)
logger = pickle.loads(r.get("context")).get("logger")
logger.addHandler(handler)
<http://logger.info|logger.info>("+++++++++++++++++++++++++")
<http://logger.info|logger.info>("It is working!")
print("regular key ", r.get("foo"))
print(logger)
print("runned")
k
Ah ok I guess it can’t be done. Will chat with an engineer on Monday to see if there are any other ideas, but this is looking too hard because it’s another subprocess
m
what do you think about pickling the log, but pickling the config of the log to as dict, as yous aid me that the config is reset when unpickling, et charge the save confi of logger in the unpickling logger ?
@Kevin Kho did you have the opportunity to talk about log issue with engineer ?
I tryed to modifie the _`__getstate__`_ fonction of
Context
class but it doesn't worked
z
Hi! I haven't had a chance to create a demo of what you should do here, but I would strongly recommend against attempting to pass loggers via pickle. Instead, I would log standard output from your task then ensure that standard output from your subprocess is forwarded to your main process.
m
what do you mean by "standart ouput" ?
m
Ok I understand , but if there are Many log in the sub process , it will not help me to debug the programme if there are exception