はじめに
以下の記事で、Databricks のマネージド型MLflowを使ってモデルのトレーニング、ライフサイクル管理を行いました。
DatabricksでMLflowを使う① – ノートブック上での実験トラッキング –
DatabricksでMLflowを使う② – 実験パラメータとメトリクスの可視化 –
DatabricksでMLflowを使う③ – モデルのライフサイクル管理 –
今回はトレーニングとステージングを行ったモデルを別のノートブックから読み込みたいと思います。
イメージとしてはトレーニングしたモデルを Pyspark ユーザー定義関数として読み込み、 pyspark のデータフレームに対して分散処理をかけるという流れになります。
セットアップ
呼び出したいモデルに対して“Run ID”を読み込みます。
1 2 3 |
# run_id = "<run-id>" run_id = "d35dff588112486fa1684f38******" model_uri = "runs:/" + run_id + "/model" |
scikit-learn モデルをロードする
実験済みのトレーニングモデルを MLflow API を利用してロードします。
1 2 3 |
import mlflow.sklearn model = mlflow.sklearn.load_model(model_uri=model_uri) model.coef_ |
次にトレーニングにも利用した糖尿病データセットを読み込んで”progression”のカラムを落とします。
そして、読み込んだ pandas データフレームを pyspark データフレームに変換しておきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# Import various libraries including sklearn, mlflow, numpy, pandas from sklearn import datasets import numpy as np import pandas as pd # Load Diabetes datasets diabetes = datasets.load_diabetes() X = diabetes.data y = diabetes.target # Create pandas DataFrame for sklearn ElasticNet linear_model Y = np.array([y]).transpose() d = np.concatenate((X, Y), axis=1) cols = ['age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6', 'progression'] data = pd.DataFrame(d, columns=cols) dataframe = spark.createDataFrame(data.drop(["progression"], axis=1)) |
MLflow モデルの呼び出し
MLflow API を使って Pyspark ユーザー定義関数としてトレーニング済みのモデルを呼び出します。
1 2 |
import mlflow.pyfunc pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri) |
ユーザー定義関数を使って予測を行います。
1 2 |
predicted_df = dataframe.withColumn("prediction", pyfunc_udf('age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6')) display(predicted_df) |
Pyspark モデルを使って分散処理することができました。
おわりに
今回はトレーニング済みのモデルを MLflow API を使って呼び出し、Pyspark で分散処理させることができました。
Databricks では日々新しい機能がアップデートされており、どんどん使いやすくなっています。
これからも新しい機能がでたら追いかけていきたいと思います。