第三弾:Pyspark 紹介(結合編)
こんにちは。コグラフSSD−2事業部の安山です。
データサイエンスの世界では、大規模なデータセットを扱う能力が不可欠です。その中核をなすのが、Apache SparkとそのPythonインターフェイスであるPySparkです。PySparkは、ビッグデータ処理における強力なツールであり、データ分析、機械学習、リアルタイムデータストリーミングなど、多岐にわたる用途に利用されています。
今回は結合の紹介となります。
インストール関連
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as fn
Join編
今回はwindow編でjoinを利用するので、それの紹介をする記事となっております。
早速本題ですが、生徒のデータ(student_id, student_name, ageカラムを持つ)と成績データ(student_id, subject, gradeカラムを持つ)を結合しようと思います。
joinの際の考え方はSQLと同様です。
# サンプルデータ
data_students = [
(1, "Alice", 20),
(2, "Bob", 22),
(3, "Charlie", 23)
]
data_grades = [
(1, "Math", "A"),
(1, "English", "B"),
(2, "Math", "C"),
(3, "English", "A"),
(3, "History", "B")
]
df_students = spark.createDataFrame(data_students, ["student_id", "student_name", "age"])
df_grades = spark.createDataFrame(data_grades, ["student_id", "subject", "grade"])
# 結合処理
result_df = df_students.join(df_grades, on="student_id", how="inner")
result_df.show()
Union編
# サンプルデータの作成
data_employees_2023 = [
(101, "John Doe", "HR"),
(102, "Jane Smith", "Engineering"),
(103, "Emily Davis", "Marketing")
]
data_employees_2024 = [
(102, "Jane Smith", "Engineering"),
(104, "Michael Brown", "Finance"),
(105, "Jennifer Wilson", "HR")
]
df_employees_2023 = spark.createDataFrame(data_employees_2023, ["employee_id", "employee_name", "department"])
df_employees_2024 = spark.createDataFrame(data_employees_2024, ["employee_id", "employee_name", "department"])
# 結合処理
result_df = df_employees_2023.unionByName(df_employees_2024)
二つのデータフレームが同質のデータを持っているので縦積みが可能。
unionの中でも特におすすめなのがunionByNameです。
unionを使った場合は、DFのカラム名の順番につい良い影響を受けますが、unionByNameはカラム名に合わせるようにunionが行われるため、結合をする際にカラムの順番を意識しなくて良いので便利です。
合わせ技編
# データ数の設定
num_rows = 5000
# 2023年のランダムな施策データの生成
campaign_data_2023 = [
(random.randint(200, 300), f"Campaign_{i % 50}", f"2023-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}", round(random.uniform(10000, 50000), 2))
for i in range(num_rows)
]
df_campaigns_2023 = spark.createDataFrame(campaign_data_2023, ["campaign_id", "campaign_name", "start_date", "budget"])
# 2023年のimpressionとclickデータの生成(CTRが最高でも10%に設定する)
imp_click_data_2023 = [
(random.randint(200, 300), imp := random.randint(5000, 100000), random.randint(0, int(imp * 0.1)))
for _ in range(num_rows)
]
df_imp_click_2023 = spark.createDataFrame(imp_click_data_2023, ["campaign_id", "impression", "click"])
# 2024年のランダムな施策データの生成
campaign_data_2024 = [
(random.randint(200, 300), f"Campaign_{i % 50}", f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}", round(random.uniform(10000, 50000), 2))
for i in range(num_rows)
]
df_campaigns_2024 = spark.createDataFrame(campaign_data_2024, ["campaign_id", "campaign_name", "start_date", "budget"])
# 2024年のimpressionとclickデータの生成(CTRが最高でも10%に設定する)
imp_click_data_2024 = [
(random.randint(200, 300), imp := random.randint(5000, 100000), random.randint(0, int(imp * 0.1)))
for _ in range(num_rows)
]
df_imp_click_2024 = spark.createDataFrame(imp_click_data_2024, ["campaign_id", "impression", "click"])
# 2023年と2024年の施策データを結合
df_all_campaigns = df_campaigns_2023.unionByName(df_campaigns_2024)
# 2023年と2024年のimpressionとclickデータを結合
df_all_imp_click = df_imp_click_2023.union(df_imp_click_2024).dropDuplicates(["campaign_id"])
# impressionとclickとの結合
df_campaign_effect = df_all_campaigns.join(df_all_imp_click, on="campaign_id", how="left")
# CTRの計算と追加
df_campaign_effect = df_campaign_effect.withColumn("CTR", (fn.col("click") / fn.col("impression")) * 100)
# 効果検証の結果を表示
df_campaign_effect.show(5)
(データ生成の内包表記部分の解説に関しては、飛ばします。。。)
まずは2023年と2024年の施策データを作成します。またimpressionとclickのデータも施策データと同様に2年分作成します。
まずは2023年と2024年の施策データを結合しますが、同質のデータを持っているので、union結合となります。
df_imp_clickに関しては、施策データとは異質のデータになります。結合をする際は、joinを使うことを意識しましょう。
最後にweb広告あるあるのCTRを算出して終了です。
終わりに
この問題集を通じて、PySparkにおける結合の基本的な操作についての理解を深めることができたでしょうか。
第四弾も公開予定です!
この記事が気に入ったらサポートをしてみませんか?