Azure Databricks: 4. PySpark基本操作

LINEで送る
Pocket

前回記事

Azure Databricks: 1. リソースの作成
Azure Databricks: 2. Databricksの基本事項
Azure Databricks: 3-1. DBFSにBlob Storageをマウント
Azure Databricks: 3-2. DBFSにAzure Data Lake Storage Gen2をマウント

サンプルデータセット

今回はkaggleのデータセット「Brazilian E-Commerce Public Dataset by Olist」をサンプルとして、Azure Databricksを使ったSparkの操作を行っていきます。
このデータはOlist StoreというブラジルのECサイトで行われた2016年から2018年までの約10万件の注文に関するデータが含まれています。
データ量としてはビッグデータというほどに多くありませんが、注文の商品明細やレビューなどが複数のCSVに分かれて保存され、それぞれがIDで紐づけられているため、PySparkやSpark SQLの練習に適しています。

CSVの読み込み

注文ごとの商品の明細情報「olist_order_items_dataset.csv」を使ってデータの読み込みとPySparkの操作を行っていきます。

DataFrameに読み込み

下記スクリプトでCSVをSpark DataFrameとして読み込みます。
読み込むCSVはカラム名を示す行が先頭にあるため、読み込みオプションとして「header=”true”」、またカラムのデータ型を自動推定するため「inferSchema=”true”」として読み込んでいます。
(※CSV読み込みオプションの詳細はDatabricksドキュメントも参照してください)

スキーマを指定して読み込み

スキーマを指定して読み込みを行う場合は下記のようにします。

※指定できる型はSparkドキュメントも参照
カラムのデータ型指定にinferSchemaを使用した場合、型推定のため1回余計に読み込むことになり、読み込みのパフォーマンスが低下します。
データのスキーマがわかっている場合は、スキーマを指定して読み込むことを推奨します。

データ型の確認

PySparkでのDataFrameの基本操作

読み込んだCSVでPySparkの基本操作を実行します。
指定行数抽出して表示

全レコード数のカウント

計算列の追加


カラムを指定して抽出

条件でレコード抽出

レコードのカウント


レコードの集計

Spark SQLを使用した操作

DataFrameをTemp Tableに登録することでSpark SQLを使用した集計が可能になります。
Temp Tableの登録


Spark SQLによるカラム抽出

%sqlマジックコマンドを使用する場合

DataFrameの結合

データセットに含まれる他のCSVファイルと組み合わせて商品カテゴリごとの売り上げを集計してみます。
各商品の詳細情報「olist_products_dataset.csv」をDataFrameに読み込みます。


2018年1月の商品カテゴリごとの売り上げ金額を集計します。
「sum(price)」をソートすると「relogios_presentes」の売り上げが最も高いことがわかりますが、ポルトガル語なので何の商品カテゴリかわかりません。


データセットにカテゴリを英語翻訳したCSVファイルがあるため、これも読み込みます。


結合して翻訳します。


Databricksではデータの可視化も簡単にできます。
売り上げが最も高いのは「watches_gifts」であることがわかりました。

CSVの書き出し

DataFrameを書き出す場合は下記コマンドを使用します。


CSVは指定したパスに直接書き出されるのではなく、指定パスのディレクトリが作成され、直下に分割されたCSVファイルとして出力されます。


ファイルを1つのCSVとして出力する場合は、HadoopのFileUtil.copyMergeを使用し、上記で出力したファイルをマージして1ファイルにまとめます。


Clusterのメモリ量に余裕がある場合は、下記スクリプトで1ファイルにデータを書き出すことができます。
この場合でも「output.csv/part-00000-xxxxxxxxx.csv」のような名称でファイルが出力されるため、出力後必要に応じてファイルの移動を行います。

LINEで送る
Pocket