Hi there! Does anybody here work with Prefect toge...
# ask-community
h
Hi there! Does anybody here work with Prefect together with Python notebooks and MLOps/Feature/Model stores?
If so, if I'm on GCP, what would you recommend using?
k
Hey @haf, on the Jupyter notebooks, have you seen the Jupyter Notebook task to execute a notebook? This uses the papermill library.
h
I haven't, but I'm looking for a workflow where I can onboard data scientists and have them focus on the data science and not infrastructure
Right now I'm on colab, and looking from there, I can connect to a local runtime, e.g. a Jypiter server locally, but I could potentially spawn https://jupyter.org/hub in my k8s cluster and work on top of that
but it seems this just takes care of the design-build parts, and nothing of the deploy-operate parts
k
Ok there’s quite a bit to unpack here. First, is the using notebooks in production. The papermill library does that. You can also use papermill + Prefect to orchestrate running those notebooks.
In terms of best practice, the industry is quite divided where some people will be against productionalizing notebooks while other people are “all-in” on it. In my experience, it will be easier for deployments if you wrap the data science library into a Python wheel
Side note: I think nteract is the company behind papermill
h
I think there's something to be said for feature engineering, data backfill and management of that data and parametisation of models, sharing of models between team members as well; I'd rather avoid all the hidden pits that people might fall into
and monitoring, logging, looking for model divergence are also factors
k
My personal suggestion is start with MLFlow, and then see what isn’t covered by that. MLFlow helps with experiment tracking, model registries, (and even model deployment).
MLFlow should be agnostic to the cloud provider. For example for experiment tracking, you have a code snippet that logs parameters and metrics after model training (potentially wrapped as a Prefect task). I believe this can be saved with Google Cloud Storage as the backend.
h
Cool, will look into
Thanks
k
You can also then keep track of models in the model registry (this I haven;t used). I have seen deployments hooked up where they swap the model in the deployment once the metrics are beat.
I think Prefect can work with MLFlow Python API well.
Feature store is honestly quite hard to pull off in an organization. Needs a lot of discipline and clear abstractions for the data. I think an organization is mature enough to do it after a couple of ML deployments.
h
Do you have any idea how people store large quantities of semi-structured data like features?
k
For large tabular datasets, parquet helps because of compressions and optimizations with loading (Spark specifically has). It also holds schema compared to CSVs.
If you’re talking JSON, I don’t know how it can be compressed…binary blob? I’m just less familiar
If you use a distributed engine like Spark/Dask, parquet files are already partitioned so it makes loading easier
h
I'm happy to use anything useful really; I got it initially in Kafka — do I simply pass it then from its topic to Spark and write it to say Google Cloud Storage?
Can I query it with Spark then?
k
Parquet can be loaded with Spark as a flat file. I have not used Spark Streaming + Kafka.
a
@haf I know it’s not exactly what you asked but you may want to check out Netflix’s Metaflow for your uses, maybe you’ll even find a way to combine it with P somehow https://metaflow.org/ (p.s. I wrote the
JupyterTask
)
I also use MLFlow with my Prefect flow but I wouldn’t call it an integration, I created a prefect task that registers stuff (params, metrics, artifacts) with MLFlow. If you want I can share it
h
I see: I’m on GCP and it seems metaflow is very AWS centric. Preferably something that runs on k8s: maybe this is MLFlow? I’d be happy to see your task that uses MLFlow
a
my use of MLFlow is very basic. I have a task that initializes a “run”:
Copy code
@task
def init_mlflow_run(experiment_id=None):
    client = MlflowClient(DEFAULT_MLFLOW_TRACKING_URI)
    if experiment_id is None:
        flow_name = prefect.context.get('flow_name')
        artifact_location = os.path.join(DEFAULT_BUCKET_PATH, "mlflow-data", flow_name)
        experiment_id = get_or_create_mlflow_experiment_id(flow_name, artifact_location)
    run = client.create_run(
        experiment_id=experiment_id,
        tags=dict(prefect_run_id=prefect.context.get('flow_run_id'))
    )
    run_id = run.info.run_id
    for k, v in prefect.context.get('parameters', {}).items():
        client.log_param(run_id, k, v)
    return run_id
And then whenever I want to log something, I call the following task that also accepts the
run_id
that the init task generated. It’s a bit customed to my use-case but I think you can tweak it to serve you better (and perhaps generalize it and contribute back to P )
Copy code
class IterablesAsListsJSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, frozenset):
            return list(o)
        return super().default(o)


@task(trigger=prefect.triggers.always_run)
def mlflow_logging(
        run_id: Optional[str] = None,
        params: Optional[dict] = None,
        metrics: Optional[dict] = None,
        artifacts: Optional[dict] = None,
        terminate=True
):
    if run_id is None:
        get_logger().warning("Got empty run_id. Skipping MLFlow logging")
        return None
    client = MlflowClient(DEFAULT_MLFLOW_TRACKING_URI)
    if params is not None and not isinstance(params, PrefectStateSignal):
        for k, v in params.items():
            client.log_param(run_id, k, v)
    if metrics is not None and not isinstance(metrics, PrefectStateSignal):
        for k, v in metrics.items():
            client.log_metric(run_id, k, v)
    if artifacts is not None and not isinstance(artifacts, PrefectStateSignal):
        shutil.rmtree(ARTIFACTS_PATH, ignore_errors=True)
        os.makedirs(ARTIFACTS_PATH)  # TODO: figure out how to upload stuff without actually saving to disk
        for filename, data in artifacts.items():
            try:
                with open(os.path.join(ARTIFACTS_PATH, f"{filename}.json"), "w") as json_f:
                    json.dump(data, json_f, cls=IterablesAsListsJSONEncoder)
            except TypeError as e:
                get_logger().warning("Failed to save data as JSON. Saving as pickle")
                with open(os.path.join(ARTIFACTS_PATH, filename), "wb") as f:
                    pickle.dump(data, f)
        client.log_artifacts(run_id, ARTIFACTS_PATH)
    if terminate:
        client.set_terminated(run_id)

    return run_id
things to consider / note: 1. Note that I’m terminating the run manually. In the basic MLFlow examples they run the flow inside an MLFlow run context and then it terminates the run once it completes. But here I’m initializing it in a different way so need to terminate manually. 2. The serialization part is very ugly (trying to JSON serialize, and fallback to pickle), you may want to be able to accept the method to serialize as a parameter. 3. Prefect is planning support for artifacts. Not sure what it would include, perhaps @Kevin Kho knows better. 4. The MLOps landscape is exploding now and there are way too many solutions. One nice solution that I know of (but haven’t tried) is called DAGsHub. The reason I mention them is that they use MLFlow under the hood (and also DVC for data versioning) so I like their paradigm in that respect (integrating existing stuff instead of building everything from scratch)
k
Hey @Avi A, thanks for sharing that code snippet. Prefect only supports markdown artifacts for now while MLFlow can handle anything that can be saved because it’s just backed by storage.
💪🏼 1
Btw @Avi A, I believe a lot of people are considering MLFlow with Prefect. I encourage you to make a post in the
show-us-what-you-got
channel about your work.
a
Thanks @Kevin Kho, but I’m not convinced it’s mature enough to share it that way 🙂