第五弾:Pyspark UDF紹介(おそらく最後)
こんにちは。コグラフSSD−2事業部の安山です。
データサイエンスの世界では、大規模なデータセットを扱う能力が不可欠です。その中核をなすのが、Apache SparkとそのPythonインターフェイスであるPySparkです。PySparkは、ビッグデータ処理における強力なツールであり、データ分析、機械学習、リアルタイムデータストリーミングなど、多岐にわたる用途に利用されています。
今回はUDFの紹介となります。
インストール関連
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as fn
UDFとは
UDF(User-Defined Function)は、ユーザーが独自に定義した関数をPySparkのデータフレーム内で使用できるようにするものです。複雑なロジックを組むことも可能なので、使い所によってはとても力を発揮します!
UDF手順
Pythonの関数を定義する。
その関数をUDFとして登録する。
データフレームのカラムに対してUDFを適用する。
基本例
from pyspark.sql.functions import udf
# 1. Pythonの関数を定義
def format_name(name):
return name.strip().title()
# 2. UDFとして登録
format_name_udf = udf(format_name, StringType())
# 3. データフレームに適用
df = spark.createDataFrame([(" alice ",), ("BOB",), (" Charlie ",)], ["name"])
df = df.withColumn("formatted_name", format_name_udf(df["name"]))
df.show()
この例では、format_name 関数をUDFとして登録し、name 列に適用することで、名前の余分な空白を削除し、適切なフォーマット(先頭大文字)に変換しています。
ちょっと複雑な例
from pyspark.sql.functions import udf, col
# データ数の設定
num_rows = 5000
# 2024年のランダムな施策データの生成
campaign_data_2024 = [
(random.randint(200, 300), f"Campaign_{i % 50}", f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}", round(random.uniform(10000, 50000), 2))
for i in range(num_rows)
]
df_campaigns_2024 = spark.createDataFrame(campaign_data_2024, ["campaign_id", "campaign_name", "start_date", "budget"])
# 2024年のimpressionとclickデータの生成(CTRが最高でも10%設定)
imp_click_data_2024 = [
(random.randint(200, 300), imp := random.randint(5000, 100000), random.randint(0, int(imp * 0.1)))
for _ in range(num_rows)
]
df_imp_click_2024 = spark.createDataFrame(imp_click_data_2024, ["campaign_id", "impression", "click"])
# impressionとclickとの結合
df_campaign_effect = df_campaigns_2024.join(df_imp_click_2024, on="campaign_id", how="left")
# CTRの計算と追加
df_campaign_effect = df_campaign_effect.withColumn("CTR", (col("click") / col("impression")) * 100)
# 1. Pythonの関数を定義
def evaluate_campaign(ctr):
if ctr > 7.0:
return "High"
elif ctr > 3.0:
return "Medium"
else:
return "Low"
# 2. UDFとして登録する
evaluate_campaign_udf = udf(evaluate_campaign, StringType())
# 3. データフレームに適用
df_campaign_effect = df_campaign_effect.withColumn("performance", evaluate_campaign_udf(df_campaign_effect["CTR"]))
df_campaign_effect.show(5)
(結合編で紹介した施策データの転用をしています。。。)
データの作成後にCTRの算出を行い、UDFとして登録したevaluate_campaign関数で、ctrが7.0より大きければhighを値として、0.3より大きければmediumを値として、それ以外であればlowとしてperformanceカラムを作成しています。このようにUDFを登録することによって、より可読性の高いコードとなります。whenなどを使うことによってwithColumn()内で条件分岐を表現する方法もありますが、条件が複雑になったりする場合は、UDFの登録をすることによって可読性を高めることを意識しましょう。
終わりに
この問題集を通じて、PySparkにおけるUDFの基本的な操作についての理解を深めることができたでしょうか。
これでpysparkの記事は最後になるかと思いますが、何か面白いネタを思いついたら、都度書いていきますので、今後ともよろしくお願いいたします!
この記事が気に入ったらサポートをしてみませんか?