はじめに
タイトル通り、SparkSQL で時系列仮想データテーブルを作る関数です。以下のような方を想定。
- 最近 Spark さわり始めた
- 時系列データで Window 処理をバッチ的に行いたい
- SparkSQL で普通にクエリが書けると聞いたけど、
- MySQL でのいつものメソッドは使えなさそう
- PostgreSQL の Generate Series が使えればいいのに
関数
こんな感じで書きました。開始日、終了日、秒単位のインターバル、テーブルの行名を引数にしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def generate_series(day_start, day_end, sec_interval, col_name): # day type: str # day format: 'yyyy/MM/dd' from pyspark.sql.functions import col from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() # day_start から day_end までのタイムスタンプ df 作成 start, stop = spark\ .createDataFrame([(day_start, day_end)], ("start", "stop"))\ .select([col(c).cast("timestamp").cast("long") for c in ("start", "stop")])\ .first() # 指定したインターバルで select calendar_table = spark\ .range(start, stop, sec_interval)\ .select(col("id").cast("timestamp")\ .alias(col_name)) return calendar_table |
使用例
1秒ごと
1 2 3 4 5 6 |
start_date = '2020/01/01' end_date = '2020/01/31' # #1秒毎のテーブル cal = generate_series(start_date, end_date, 1, 'time_stamp') display(cal) |
1時間ごと
1 2 3 4 5 6 |
start_date = '2020/01/01' end_date = '2020/01/31' # #1時間毎のテーブル cal = generate_series(start_date, end_date, 60*60, 'time_stamp') display(cal) |
おわりに
とりあえずは動作しますが、、
- この関数使えば一発じゃん
- こんな非効率なことして…
という方、情報いただけると大変うれしいです。