m
11/03/2021, 11:49 PM@task(log_stdout=True)
def run_mlflow(project_path, experiment):
mlflow.projects.run(
project_path, experiment_name=experiment,
)
and there my mlflow experiment where I would put prefect logging :
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)
# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):
set_env(cfg)
get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)
train = mlflow.run(project_path, "train", experiment_name=experiment)
the logging from print() appear just in the task and not in the mlflow experiment
Thank you in advanceZanie
m
11/04/2021, 12:12 AM@task()
def run_mlflow(project_path, experiment):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("------------An info message.------------------")
mlflow.projects.run(
project_path, experiment_name=experiment,
)
there main.py, the script run when I set mlflow.project.run()
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)
# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("------------An MLFLOW RUN ------------------")
set_env(cfg)
get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)
train = mlflow.run(project_path, "train", experiment_name=experiment)
here the output logs, as you can notice, only log directly in the task is relatedm
11/04/2021, 12:14 AMZanie
m
11/04/2021, 12:41 AMinitialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)
# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):
set_env(cfg)
for l in ["mlflow", "snowflake.connector", "boto3", "custom_lib"]:
logger = logging.getLogger(l)
logger.setLevel("INFO")
log_stream = logging.StreamHandler(sys.stdout)
LOG_FORMAT = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
log_stream.setFormatter(LOG_FORMAT)
logger.addHandler(log_stream)
<http://logger.info|logger.info>("---------test------------")
<http://logger.info|logger.info>("-----------test2-----------")
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("------------An MLFLOW RUN ------------------")
get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)
train = mlflow.run(project_path, "train", experiment_name=experiment)
But I didn't have any logs nevertheless
I don't think mlfow is using standart logger, its own logs are, in fact, log for saving artifact, mlflow don't have properly log, anyway , even so is it possible to use generic logging and set in prefect without specify library, like passe an generic log no matter where it is execute ?Kevin Kho
export PREFECT__LOGGING__EXTRA_LOGGERS="['mlfow']"
or in the config.toml
[logging]
# Extra loggers for Prefect log configuration
extra_loggers = "['mlflow']"
This should be done in the place that it running the Flow. If you are registering and running this flow, it should be set on the agent.m
11/04/2021, 7:47 AMm
11/04/2021, 8:51 AMPREFECT__LOGGING__EXTRA_LOGGERS
m
11/04/2021, 8:57 AMm
11/04/2021, 9:21 AMimport hydra
from omegaconf import DictConfig, OmegaConf
import os
import uuid
# @hydra.main(config_path="conf", config_name="config")
from hydra import compose, initialize
from omegaconf import OmegaConf
# print(os.environ)
# print(subprocess.run(["ls"]))
import os
import warnings
import sys
from funcy.colls import project
import mlflow
import subprocess
import prefect
import logging
import sys
def set_env(cfg: DictConfig) -> None:
"""
set all env var
"""
env = cfg["var"]
for k, v in env.items():
os.environ[k] = v
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)
l = "mlflow"
logger = logging.getLogger(l)
logger.setLevel("INFO")
log_stream = logging.StreamHandler(sys.stdout)
LOG_FORMAT = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log_stream.setFormatter(LOG_FORMAT)
logger.addHandler(log_stream)
# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):
set_env(cfg)
<http://logger.info|logger.info>("---------test------------")
<http://logger.info|logger.info>("-----------test2-----------")
# logger = prefect.context.get("logger")
# <http://logger.info|logger.info>("------------An MLFLOW RUN ------------------")
get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)
train = mlflow.run(project_path, "train", experiment_name=experiment)
m
11/04/2021, 9:22 AMimport mlflow
from mlflow import projects
import os
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.run_configs import LocalRun
import prefect
from prefect import task, Flow, Parameter
import subprocess
from prefect import Client
import hydra
from omegaconf import DictConfig, OmegaConf
from dataclasses import dataclass
from omegaconf import DictConfig, OmegaConf
from hydra.core.config_store import ConfigStore
import yaml
import logging
import sys
# os.environ["AWS_SECRET_ACCESS_KEY"] = "admin1598753"
# os.environ["AWS_ACCESS_KEY_ID"] = "adminminio"
# os.environ["MLFLOW_S3_ENDPOINT_URL"] = "<http://localhost:3001>"
# os.environ["MLFLOW_S3_IGNORE_TLS"] = "true"
# os.environ["MLFLOW_TRACKING_URI"] = "localhost:3000"
# print("os")
# project_path = "./project"
# experiment = "gojob"
# traking = "<http://localhost:5000>"
# params = {}
@task
def set_env(cfg: DictConfig) -> None:
"""
set all env var
"""
env = cfg["var"]
for k, v in env.items():
os.environ[k] = v
@task
def set_uri(traking):
mlflow.set_tracking_uri(traking)
return traking
@task(log_stdout=True)
def set_exp(experiment):
print("log set experiment ")
mlflow.set_experiment(experiment)
return experiment
@task()
def run_mlflow(project_path, experiment):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("------------An info message.------------------")
l = "mlflow"
logger = logging.getLogger(l)
logger.setLevel("INFO")
log_stream = logging.StreamHandler(sys.stdout)
LOG_FORMAT = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
log_stream.setFormatter(LOG_FORMAT)
logger.addHandler(log_stream)
mlflow.projects.run(
project_path, experiment_name=experiment,
)
print("experiment setted")
@dataclass
class Run:
flow_id: str = "None"
@hydra.main(config_path="project/conf", config_name="config")
def workflow(cfg: DictConfig):
with Flow("gojobflow", run_config=LocalRun()) as flow:
logger = prefect.context.get("logger")
logger = prefect.utilities.logging.get_logger()
experiment = cfg["experiment"]
tracking = cfg["var"]["MLFLOW_TRACKING_URI"]
project_path = cfg["project_path"]
v = set_env(cfg)
s = set_uri(tracking)
e = set_exp(experiment)
r = run_mlflow(project_path, e)
try:
idf = flow.register(project_name="gojob", set_schedule_active=False)
run_1 = {"flow_id": idf}
<http://logger.info|logger.info>(cfg["project_path"] + "/conf/run/run_1.yaml")
with open(cfg["project_path"] + "/conf/run/run_1.yaml", "w+") as outfile:
yaml.dump(run_1, outfile, default_flow_style=False)
except:
subprocess.run(["prefect", "create", "project", "gojob"])
idf = flow.register(project_name="gojob", set_schedule_active=False)
run_1 = {"flow_id": idf}
<http://logger.info|logger.info>(cfg["project_path"] + "/conf/run/run_1.yaml")
with open(cfg["project_path"] + "/conf/run/run_1.yaml", "w+") as outfile:
yaml.dump(run_1, outfile, default_flow_style=False)
# ri = Run(flow_id=idf)
# Registering the Config class with the name 'config'.
# cs.store(group="run", name="run_1", node=ri)
# print(OmegaConf.to_yaml(cfg))
if __name__ == "__main__":
workflow()
m
11/04/2021, 9:37 AMAnna Geller
logger = prefect.context.get("logger")
I see that sometimes you defined it, but then you redeclared the logger object with different configuration, as e.g. here:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("------------An info message.------------------")
l = "mlflow"
logger = logging.getLogger(l)
logger.setLevel("INFO")
log_stream = logging.StreamHandler(sys.stdout)
LOG_FORMAT = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
log_stream.setFormatter(LOG_FORMAT)
logger.addHandler(log_stream)
so you can replace this entire block by only this line:
logger = prefect.context.get("logger")
Can you try that and report back?Anna Geller
export PREFECT__LOGGING__EXTRA_LOGGERS="['boto3', 'botocore']"
export PREFECT__LOGGING__LEVEL="DEBUG"
Now, when I use boto3 in my flow code and set the level to DEBUG explicitly, then all of boto3 debug logs like response headers etc. become available in the logs. Here is an example:
from prefect import task, Flow
import boto3
import pandas as pd
import prefect
import logging
@task
def get_s3_data():
logging.getLogger("boto3").setLevel(logging.DEBUG)
logging.getLogger("botocore").setLevel(logging.DEBUG)
prefect_logger = prefect.context.get("logger")
s3 = boto3.client("s3")
obj = s3.get_object(
Bucket="data-lake-bronze",
Key="global_power_plant_dataset/global_power_plant_database.csv",
)
df = pd.read_csv(obj["Body"])
<http://prefect_logger.info|prefect_logger.info>("Read df: %s", df.head(2))
with Flow("log_test",) as flow:
get_s3_data()
Then, I register the flow and run it, and here are the logs:m
11/05/2021, 12:47 AMm
11/05/2021, 12:48 AMm
11/05/2021, 12:48 AMm
11/05/2021, 12:49 AMKevin Kho
task_a
that creates some subprocess and the subprocess has logs that Prefect needs to see? I think this is dependent now on the subprocess. Is that a Python subprocess and how are you starting it?m
11/05/2021, 1:03 AMm
11/05/2021, 1:05 AMm
11/05/2021, 1:07 AMm
11/05/2021, 1:08 AMm
11/05/2021, 1:10 AMKevin Kho
mlflow.projects.run
, the problem is that I am not seeing anything they expose to send logs to the standard Python logger. I think we can help you if you can give us a code snippet how to collect those logs if you were not using Prefect, and then we can find how to do it in Prefect.
I think what you are suggesting is that Prefect exposes a way for other processes to send logs to the API if you provide the flow_run_id
or something. The problem is the the Prefect logger is heavily tied to context
, which is created during the Flow run because Prefect Cloud creates the metadata when the Flow runs. So in the source code, you will see it instantiates this handler from the context.
I don’t know for sure how MLFlow does it, but think about it this way, if Prefect kicks off a Databricks job, there is a completely separate Python process that does not know about Prefect. You would need to be able to pass the `context`to the subprocess, create the CloudHandler
and then attach it to your Python logger of that subprocess. It might be possible, but you need to be able to accept the context
somehow.
If your subprocess is still attached to the parent process and you can pass context easily, I think the best thing to try is to pass logger
into the subprocess and then try doing <http://logger.info|logger.info>(…)
like you are suggesting.
If that doesn’t work, you can try passing context, and then creating CloudHandler
, and then attaching CloudHandler
to your log there. This snippet might help in showing how to attach a handler.m
11/05/2021, 2:01 AMm
11/06/2021, 7:45 PMm
11/06/2021, 7:45 PMm
11/06/2021, 7:46 PMimport redis
import cloudpickle
import pickle
import prefect
from prefect import task, Flow, Parameter
import subprocess
# multi key usage
import prefect
from prefect import task, Flow
import cloudpickle
@task
def set_print():
r = redis.Redis(host="localhost", port=6379, db=0)
r.set("foo", "bar")
print("regular key ", r.get("foo"))
@task
def redis_dict():
user = {
"Name": "Pradeep",
"Company": "SCTL",
"Address": "Mumbai",
"Location": "RCP",
}
r = redis.Redis(host="localhost", port=6379, db=0)
r.hmset("pythonDict", user)
print(r.hgetall("pythonDict"))
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
r = redis.Redis(host="localhost", port=6379, db=0)
r.set("logger", cloudpickle.dumps(logger))
subprocess.run(["python", "task_subprocess.py"])
with Flow("hello-flow") as flow:
hello_task()
flow.run()
m
11/06/2021, 7:46 PMm
11/06/2021, 7:47 PMimport pickle
import redis
print('start')
r = redis.Redis(host="localhost", port=6379, db=0)
logger = pickle.loads(r.get("logger"))
<http://logger.info|logger.info>("It is working!")
print("runned")
m
11/06/2021, 7:48 PMKevin Kho
cloudpickle
or pickle
because they lose their configuration when they are unpickled
. But in this case, you might be able to just use the print("runned")
, and then on the task side, you can do:
@task(log_stdout=True)
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
subprocess.run(["python", "task_subprocess.py"])
And this might capture print("runned")
because the task will capture stdout
m
11/06/2021, 8:10 PMm
11/06/2021, 8:10 PMm
11/06/2021, 8:11 PMKevin Kho
stdout
is from another process. Yes you may have to attach the CloudHandler somehow and pass the context to point that subprocess logging to Prefect Cloudm
11/07/2021, 12:01 AMm
11/07/2021, 12:01 AMm
11/07/2021, 12:01 AMprefect
module, as in `prefect.context`')m
11/07/2021, 12:02 AMm
11/07/2021, 12:02 AM@task(log_stdout=True)
def run_mlflow(project_path, experiment):
print("mlflowrun")
logger = prefect.context
r = redis.Redis(host="localhost", port=6379, db=0)
print(" pinckling ")
r.set("context", cloudpickle.dumps(logger))
print( "" )
subprocess.run(["python", "redis/task_subprocess.py"])
mlflow.projects.run(
project_path,
experiment_name=experiment,
)
m
11/07/2021, 12:02 AMimport pickle
import redis
from prefect.utilities import logging
print("start")
handler = logging.CloudHandler()
r = redis.Redis(host="localhost", port=6379, db=0)
logger = pickle.loads(r.get("context")).get("logger")
logger.addHandler(handler)
<http://logger.info|logger.info>("+++++++++++++++++++++++++")
<http://logger.info|logger.info>("It is working!")
print("regular key ", r.get("foo"))
print(logger)
print("runned")
Kevin Kho
m
11/07/2021, 8:53 AMm
11/09/2021, 1:22 AMm
11/09/2021, 1:23 AMContext
class but it doesn't workedZanie
m
11/09/2021, 1:40 AMZanie
Zanie
m
11/09/2021, 2:29 AM