見出し画像

第四弾:Pyspark 問題集(Window編)

こんにちは。コグラフSSD−2事業部の安山です。
データサイエンスの世界では、大規模なデータセットを扱う能力が不可欠です。その中核をなすのが、Apache SparkとそのPythonインターフェイスであるPySparkです。PySparkは、ビッグデータ処理における強力なツールであり、データ分析、機械学習、リアルタイムデータストリーミングなど、多岐にわたる用途に利用されています。
PySparkを使った具体的なデータ処理の例と解答例を示し、理解を深めることができるように設計しました。
アウトプットをしながら楽しんで取り組んでください!


環境

GoogleのColaboratoryで実行できるようになっています。

事前準備

下記記事からcsvファイルをダウンロードしてください。


インストール関連

!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as fn
from pyspark.sql.window import Window ##今回の主役
import pandas as pd

csvファイルの判定

環境によっては文字化けを起こす可能性があるので、こちらを使用し判定して、読み込む際に活用してください。

import chardet    

def detect_encoding(file_path):
    with open(file_path, 'rb') as file:
     
        result = chardet.detect(file.read(10000))
        return result['encoding']

file_name = 'order_history.csv'  # CSVファイル名(別の階層にある場合はパス)
encoding = detect_encoding(file_name)
print(f"Detected encoding: {encoding}")

DataFrameの作成

member_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("members.csv")
product_sdf = spark.read.format("csv").option("header", "true").option("encoding", "utf-8").load("products.csv")
order_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("order_history.csv")

Window関数

window関数は、分析において非常に強力なツールであり、特定のグループ内で各行に対して集計や計算を行う際に利用されます。groupBy 関数と比較されることが多いですが、window関数には groupBy にはない柔軟性と機能があり、より複雑なデータ操作を可能にします。

問題

問1 各ユーザーごとに、購入履歴を日付順に並べ、累積購入金額を計算する

from pyspark.sql.functions import col, sum

df = order_sdf.join(product_sdf, "product_id")

windowSpec = Window.partitionBy("member_id").orderBy("purchase_time").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn("cumulative_total", sum(col("price") * col("quantity")).over(windowSpec))

df.select("member_id", "purchase_time", "price", "quantity", "cumulative_total").show()

window関数と一緒によく登場するpartitionですが「分割」や「区分」という意味があります。(コロナが流行っていた時期に机の上によく置かれていたのもパーティションと呼んでいましたね。)
何で区切りたいかを考えながら使うといいと思います!
この問題では、各ユーザーごとなのでmember_idで区切ります。これまでの記事では「〜ごと」というワードが出てきた場合は「groupBy」を使うことを紹介してきましたが、「groupBy」と大きく異なる点は「データの縦幅(レコード数)」を変えずに「〜ごと」の処理を実行できる点です。
また、最終的な目的が「ある日付時点での累積購入金額を既存のレコードに付与すること」なので、データの縦幅が変わらないようにwindow関数を用いて、partitionByで処理を実現していくことになります。
windowの仕様としては、partitionBy("member_id")でユーザーIDごとにデータを分割し、orderBy("purchase_date")で各ユーザーの購入履歴を購入日順に並べます。また、累積を計算するために処理する行を指定するためにrowsBetween(Window.unboundedPreceding, Window.currentRow)を追記します。(Window.unboundedPrecedingはウィンドウの先頭行、Window.currentRowは現在の行を指します)
sum(col("price") * col("quantity")).over(windowSpec)では定義したウィンドウの仕様に基づいて、price*quantityの累積合計を計算します。withcolumnで新しいカラムを作成しているので、各ユーザーの購入金額が累積され、新しい列に保存されます。

問2 各ユーザーごとに、購入履歴を金額の高い順に並べ、購入ランキングを計算する

from pyspark.sql.functions import rank

windowSpec = Window.partitionBy("member_id").orderBy(col("price").desc())

df = order_sdf.join(product_sdf, "product_id").withColumn("rank", rank().over(windowSpec))

df.select("member_id", "product_name", "price", "rank").show()

rank関数:は、ウィンドウ関数で、指定したウィンドウ内で順位付けを行います。同じ順位が存在する場合、その後の順位がスキップされます(例えば、2つのアイテムが1位の場合、次の順位は3位になります)
orderBy(col("price").desc()): 各member_idごとに、price(購入価格)で降順に並べ替えます。これにより、最も高額な購入から順に並べられます。WithColumn("rank", rank().over(windowSpec))は、rank関数を用いて前述のwindowSpecに基づいて各ユーザーの購入履歴に順位を付けます。rank関数は、windowSpec内での順位を計算し、新しい列であるrankとしてデータフレームに追加します。この列には、各member_idごとに高額な購入から順にランクが付けられます。

終わりに

この問題集を通じて、PySparkにおけるwindowの基本的な操作についての理解を深めることができたでしょうか。
第五弾も公開予定です!

この記事が気に入ったらサポートをしてみませんか?