はじめに
petastormやっていきまーす
開発環境
Spark から TensorFlow へのデータ変換を簡略化する
1.こちらのノートブックをやっていきます
https://docs.microsoft.com/ja-jp/azure/databricks/_static/notebooks/deep-learning/petastorm-spark-converter-tensorflow.html
2.ライブラリをインストール
1 2 3 4 5 6 7 |
%pip install petastorm %pip install tensorflow %pip install hyperopt %pip install horovod %pip install sparkdl %pip install tensorframes |
3.ライブラリをインポート
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from pyspark.sql.functions import col from petastorm.spark import SparkDatasetConverter, make_spark_converter import io import numpy as np import tensorflow as tf from PIL import Image from petastorm import TransformSpec from tensorflow import keras from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK import horovod.tensorflow.keras as hvd from sparkdl import HorovodRunner |
エラー出た
1 2 3 4 5 6 |
/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py:167: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead. original_result = python_builtin_import(name, globals, locals, fromlist, level) /local_disk0/.ephemeral_nfs/envs/pythonEnv-30dab267-e5ed-4c28-9b42-2c7adb85cdb4/lib/python3.8/site-packages/petastorm/spark/spark_dataset_converter.py:28: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead. from pyarrow import LocalFileSystem ImportError: cannot import name 'resnet50' from 'keras.applications' (/local_disk0/.ephemeral_nfs/envs/pythonEnv-30dab267-e5ed-4c28-9b42-2c7adb85cdb4/lib/python3.8/site-packages/keras/applications/__init__.py) |
Spark から PyTorch ノートブックへのデータ変換を簡略化する
1.こちらのノートブックをやっていきます
https://docs.microsoft.com/ja-jp/azure/databricks/_static/notebooks/deep-learning/petastorm-spark-converter-pytorch.html
2.ライブラリをインストール
1 2 3 4 5 6 7 8 9 |
%pip install petastorm %pip install torch==1.5.0 %pip install torchvision==0.6.0 %pip install hyperopt %pip install horovod %pip install sparkdl %pip install tensorflow %pip install tensorframes |
3.ライブラリをインポート
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from pyspark.sql.functions import col from petastorm.spark import SparkDatasetConverter, make_spark_converter import io import numpy as np import torch import torchvision from PIL import Image from functools import partial from petastorm import TransformSpec from torchvision import transforms from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK import horovod.torch as hvd from sparkdl import HorovodRunner |
エラー出た
1 2 3 4 5 6 7 8 9 |
/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py:167: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead. original_result = python_builtin_import(name, globals, locals, fromlist, level) /local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/petastorm/spark/spark_dataset_converter.py:28: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead. from pyarrow import LocalFileSystem Extension horovod.torch has not been built: /local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/horovod/torch/mpi_lib/_mpi_lib.cpython-38-x86_64-linux-gnu.so not found If this is not expected, reinstall Horovod with HOROVOD_WITH_PYTORCH=1 to debug the build error. Warning! MPI libs are missing, but python applications are still available. ImportError: cannot import name 'resnet50' from 'keras.applications' (/local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/keras/applications/__init__.py) |
Spark と Petastorm を使用してディープ ラーニング ノートブック用のデータを準備する
1.こちらのノートブックをやっていきます
2.ライブラリをインストール
1 2 3 |
%pip install tensorflow %pip install petastorm |
3.ライブラリをインポート
1 2 3 4 |
import os import subprocess import uuid |
3.ディレクトリを作成
1 2 3 4 5 6 7 |
# Set a unique working directory for this notebook. work_dir = os.path.join("/ml/tmp/petastorm", str(uuid.uuid4())) dbutils.fs.mkdirs(work_dir) def get_local_path(dbfs_path): return os.path.join("/dbfs", dbfs_path.lstrip("/")) |
4.データセット(mnist)をダウンロード
1 2 3 4 |
data_url = "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2" libsvm_path = os.path.join(work_dir, "mnist.bz2") subprocess.check_output(["wget", data_url, "-O", get_local_path(libsvm_path)]) |
1 2 |
Out[13]: b'' |
5.データセットの読み込み
1 2 3 4 |
df = spark.read.format("libsvm") \ .option("numFeatures", "784") \ .load(libsvm_path) |
6.データを配列に格納
1 2 3 4 5 6 7 |
%scala import org.apache.spark.ml.linalg.Vector val toArray = udf { v: Vector => v.toArray } spark.udf.register("toArray", toArray) |
1 2 3 4 |
import org.apache.spark.ml.linalg.Vector toArray: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$10093/495988232@1dd62a2c,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),Some(class[value[0]: array<double>]),None,true,true) res1: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$10093/495988232@1dd62a2c,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),Some(class[value[0]: array<double>]),Some(toArray),true,true) |
7.Parquet形式へ変換
1 2 3 4 5 6 7 8 9 10 11 |
# Convert sparse vectors to dense arrays and write the data as Parquet. # Petastorm will sample Parquet row groups into batches. # Batch size is important for the utilization of both I/O and compute. # You can use parquet.block.size to control the size. parquet_path = os.path.join(work_dir, "parquet") df.selectExpr("toArray(features) AS features", "int(label) AS label") \ .repartition(10) \ .write.mode("overwrite") \ .option("parquet.block.size", 1024 * 1024) \ .parquet(parquet_path) |
8.ライブラリをインポート
1 2 3 4 5 6 7 |
import tensorflow as tf from tensorflow import keras from tensorflow.keras import models, layers from petastorm import make_batch_reader from petastorm.tf_utils import make_petastorm_dataset |
9.モデルの定義
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def get_model(): model = models.Sequential() model.add(layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1))) model.add(layers.Conv2D(64, (3, 3), activation='relu')) model.add(layers.MaxPooling2D(pool_size=(2, 2))) model.add(layers.Dropout(0.25)) model.add(layers.Flatten()) model.add(layers.Dense(128, activation='relu')) model.add(layers.Dropout(0.5)) model.add(layers.Dense(10, activation='softmax')) return model |
10.parquet形式のデータのパスを定義
1 2 3 4 5 6 |
import os import pyarrow.parquet as pq underscore_files = [f for f in os.listdir(get_local_path(parquet_path)) if f.startswith("_")] pq.EXCLUDED_PARQUET_PATHS.update(underscore_files) |
11.Parquet形式のデータをmake_batch_readerで読み取り、学習
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# We use make_batch_reader to load Parquet row groups into batches. # HINT: Use cur_shard and shard_count params to shard data in distributed training. petastorm_dataset_url = "file://" + get_local_path(parquet_path) with make_batch_reader(petastorm_dataset_url, num_epochs=100) as reader: dataset = make_petastorm_dataset(reader) \ .map(lambda x: (tf.reshape(x.features, [-1, 28, 28, 1]), tf.one_hot(x.label, 10))) model = get_model() optimizer = keras.optimizers.Adadelta() model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) model.fit(dataset, steps_per_epoch=10, epochs=10) |
1 2 3 4 5 6 7 |
/local_disk0/.ephemeral_nfs/envs/pythonEnv-480ab404-1e1a-48b7-b616-46583213ddef/lib/python3.8/site-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead. self._filesystem = pyarrow.localfs Epoch 1/10 10/10 [==============================] - 3s 159ms/step - loss: 42.4297 - accuracy: 0.0952 Epoch 2/10 ... |
12.ディレクトリの削除
1 2 3 |
# Clean up the working directory. dbutils.fs.rm(work_dir, recurse=True) |
13.ディレクトリが残っていた場合の確認
1 2 |
%fs ls /ml/tmp/petastorm |
1 2 3 |
path,name,size,modificationTime dbfs:/ml/tmp/petastorm/7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/,7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/,0,1650708482000 |
14.uuidを指定して削除
1 2 |
dbutils.fs.rm("/ml/tmp/petastorm/7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/", recurse=True) |
参考文献
Azure Databricksの導入ならナレコムにおまかせください。
導入から活用方法までサポートします。お気軽にご相談ください。