はじめに
DatabricksのML Quickstartやっていきます
ML Quickstart: Model Training – Databricks
Cluster
導入
1.Azure Databricksで新しいノートブックを作成する
2.ライブラリのインストール
1 2 3 4 5 6 |
%pip install mlflow %pip install numpy %pip install pandas %pip install scikit-learn %pip install hyperopt |
3.ライブラリのインポート
1 2 3 4 5 6 7 8 9 10 11 |
import mlflow import numpy as np import pandas as pd import sklearn.datasets import sklearn.metrics import sklearn.model_selection import sklearn.ensemble from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK from hyperopt.pyll import scope |
4.ワインのクオリティデータセットをダウンロード
Wine Quality Dataset
UCI Machine Learning Repository: Wine Quality Data Set
5.Create Tableから、winequality_white.csvとwinequality_red.csvをアップロード
6.データセットの読み込み
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
white_wine = pd.read_csv("/dbfs/FileStore/tables/winequality_white.csv", sep=';') red_wine = pd.read_csv("/dbfs/FileStore/tables/winequality_red.csv", sep=';') white_wine['is_red'] = 0.0 red_wine['is_red'] = 1.0 data_df = pd.concat([white_wine, red_wine], axis=0) # Define classification labels based on the wine quality data_labels = data_df['quality'] >= 7 data_df = data_df.drop(['quality'], axis=1) # Split 80/20 train-test X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split( data_df, data_labels, test_size=0.2, random_state=1 ) |
7.autologgingの有効化
1 2 3 |
# Enable MLflow autologging for this notebook mlflow.autolog() |
JavaPackageがないと怒られました
1 2 3 4 5 6 7 8 9 |
2022/04/12 15:46:01 INFO mlflow.tracking.fluent: Autologging successfully enabled for sklearn. 2022/04/12 15:46:01 WARNING mlflow.utils.autologging_utils: Encountered unexpected error during spark autologging: Exception while attempting to initialize JVM-side state for Spark datasource autologging. Please create a new Spark session and ensure you have the mlflow-spark JAR attached to your Spark session as described in http://mlflow.org/docs/latest/tracking.html#automatic-logging-from-spark-experimental. Exception: 'JavaPackage' object is not callable 2022/04/12 15:46:01 WARNING mlflow.tracking.fluent: Exception raised while enabling autologging for pyspark: Exception while attempting to initialize JVM-side state for Spark datasource autologging. Please create a new Spark session and ensure you have the mlflow-spark JAR attached to your Spark session as described in http://mlflow.org/docs/latest/tracking.html#automatic-logging-from-spark-experimental. Exception: 'JavaPackage' object is not callable 2022/04/12 15:46:01 WARNING mlflow.utils.autologging_utils: Encountered unexpected error during spark autologging: Exception while attempting to initialize JVM-side state for Spark datasource autologging. Please create a new Spark session and ensure you have the mlflow-spark JAR attached to your Spark session as described in http://mlflow.org/docs/latest/tracking.html#automatic-logging-from-spark-experimental. Exception: 'JavaPackage' object is not callable 2022/04/12 15:46:01 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml. |
8.MLFlowを開始
1 2 3 4 5 6 7 8 9 10 11 12 13 |
with mlflow.start_run(run_name='gradient_boost') as run: model = sklearn.ensemble.GradientBoostingClassifier(random_state=0) # Models, parameters, and training metrics are tracked automatically model.fit(X_train, y_train) predicted_probs = model.predict_proba(X_test) roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1]) # The AUC score on test data is not automatically logged, so log it manually mlflow.log_metric("test_auc", roc_auc) print("Test AUC of: {}".format(roc_auc)) |
1 2 |
Test AUC of: 0.8834365701533531 |
9.n_estimatorsを追加して再度MLFlowを実行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# Start a new run and assign a run_name for future reference with mlflow.start_run(run_name='gradient_boost') as run: model_2 = sklearn.ensemble.GradientBoostingClassifier( random_state=0, # Try a new parameter setting for n_estimators n_estimators=200, ) model_2.fit(X_train, y_train) predicted_probs = model_2.predict_proba(X_test) roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1]) mlflow.log_metric("test_auc", roc_auc) print("Test AUC of: {}".format(roc_auc)) |
1 2 |
Test AUC of: 0.8914761673151751 |
10.別のノートブックやジョブから利用できる
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# After a model has been logged, you can load it in different notebooks or jobs # mlflow.pyfunc.load_model makes model prediction available under a common API model_loaded = mlflow.pyfunc.load_model( 'runs:/{run_id}/model'.format( run_id=run.info.run_id ) ) predictions_loaded = model_loaded.predict(X_test) predictions_original = model_2.predict(X_test) # The loaded model should match the original assert(np.array_equal(predictions_loaded, predictions_original)) |
11.ハイパーパラメータチューニング
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# Define the search space to explore search_space = { 'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)), 'learning_rate': hp.loguniform('learning_rate', -3, 0), 'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)), } def train_model(params): # Enable autologging on each worker mlflow.autolog() with mlflow.start_run(nested=True): model_hp = sklearn.ensemble.GradientBoostingClassifier( random_state=0, **params ) model_hp.fit(X_train, y_train) predicted_probs = model_hp.predict_proba(X_test) # Tune based on the test AUC # In production settings, you could use a separate validation set instead roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1]) mlflow.log_metric('test_auc', roc_auc) # Set the loss to -1*auc_score so fmin maximizes the auc_score return {'status': STATUS_OK, 'loss': -1*roc_auc} # SparkTrials distributes the tuning using Spark workers # Greater parallelism speeds processing, but each hyperparameter trial has less information from other trials # On smaller clusters or Databricks Community Edition try setting parallelism=2 spark_trials = SparkTrials( parallelism=8 ) with mlflow.start_run(run_name='gb_hyperopt') as run: # Use hyperopt to find the parameters yielding the highest AUC best_params = fmin( fn=train_model, space=search_space, algo=tpe.suggest, max_evals=32, trials=spark_trials) |
12.ベストスコアのモデルで実行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# Sort runs by their test auc; in case of ties, use the most recent run best_run = mlflow.search_runs( order_by=['metrics.test_auc DESC', 'start_time DESC'], max_results=10, ).iloc[0] print('Best Run') print('AUC: {}'.format(best_run["metrics.test_auc"])) print('Num Estimators: {}'.format(best_run["params.n_estimators"])) print('Max Depth: {}'.format(best_run["params.max_depth"])) print('Learning Rate: {}'.format(best_run["params.learning_rate"])) best_model_pyfunc = mlflow.pyfunc.load_model( 'runs:/{run_id}/model'.format( run_id=best_run.run_id ) ) best_model_predictions = best_model_pyfunc.predict(X_test[:5]) print("Test Predictions: {}".format(best_model_predictions)) |
1 2 3 4 5 6 7 |
Best Run AUC: 0.9142824444953079 Num Estimators: 782 Max Depth: 5 Learning Rate: 0.08908078790759665 Test Predictions: [False False False True False] |
お疲れ様でした。
参考文献
ノートブック(Databricks ML Quickstart: Model Training)
ML Quickstart: Model Training – Databricks
Azure Databricksの導入ならナレコムにおまかせください。
導入から活用方法までサポートします。お気軽にご相談ください。