はじめに
以下の記事で、Databricks のマネージド型MLflowを使ってモデルのトレーニング、ライフサイクル管理を行いました。
DatabricksでMLflowを使う① – ノートブック上での実験トラッキング –
DatabricksでMLflowを使う② – 実験パラメータとメトリクスの可視化 –
DatabricksでMLflowを使う③ – モデルのライフサイクル管理 –
今回はトレーニングとステージングを行ったモデルを別のノートブックから読み込みたいと思います。
イメージとしてはトレーニングしたモデルを Pyspark ユーザー定義関数として読み込み、 pyspark のデータフレームに対して分散処理をかけるという流れになります。
セットアップ
呼び出したいモデルに対して“Run ID”を読み込みます。
| 1 2 3 | # run_id = "<run-id>" run_id = "d35dff588112486fa1684f38******" model_uri = "runs:/" + run_id + "/model" | 
scikit-learn モデルをロードする
実験済みのトレーニングモデルを MLflow API を利用してロードします。
| 1 2 3 | import mlflow.sklearn model = mlflow.sklearn.load_model(model_uri=model_uri) model.coef_ | 
次にトレーニングにも利用した糖尿病データセットを読み込んで”progression”のカラムを落とします。
そして、読み込んだ pandas データフレームを pyspark データフレームに変換しておきます。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # Import various libraries including sklearn, mlflow, numpy, pandas from sklearn import datasets import numpy as np import pandas as pd # Load Diabetes datasets diabetes = datasets.load_diabetes() X = diabetes.data y = diabetes.target # Create pandas DataFrame for sklearn ElasticNet linear_model Y = np.array([y]).transpose() d = np.concatenate((X, Y), axis=1) cols = ['age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6', 'progression'] data = pd.DataFrame(d, columns=cols) dataframe = spark.createDataFrame(data.drop(["progression"], axis=1)) | 
MLflow モデルの呼び出し
MLflow API を使って Pyspark ユーザー定義関数としてトレーニング済みのモデルを呼び出します。
| 1 2 | import mlflow.pyfunc pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri) | 
ユーザー定義関数を使って予測を行います。
| 1 2 | predicted_df = dataframe.withColumn("prediction", pyfunc_udf('age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6')) display(predicted_df) | 
Pyspark モデルを使って分散処理することができました。
おわりに
今回はトレーニング済みのモデルを MLflow API を使って呼び出し、Pyspark で分散処理させることができました。
Databricks では日々新しい機能がアップデートされており、どんどん使いやすくなっています。
これからも新しい機能がでたら追いかけていきたいと思います。


