見出し画像

第五弾:Pyspark UDF紹介(おそらく最後)

こんにちは。コグラフSSD−2事業部の安山です。
データサイエンスの世界では、大規模なデータセットを扱う能力が不可欠です。その中核をなすのが、Apache SparkとそのPythonインターフェイスであるPySparkです。PySparkは、ビッグデータ処理における強力なツールであり、データ分析、機械学習、リアルタイムデータストリーミングなど、多岐にわたる用途に利用されています。
今回はUDFの紹介となります。


インストール関連

!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as fn

UDFとは

UDF(User-Defined Function)は、ユーザーが独自に定義した関数をPySparkのデータフレーム内で使用できるようにするものです。複雑なロジックを組むことも可能なので、使い所によってはとても力を発揮します!

UDF手順

  1. Pythonの関数を定義する。

  2. その関数をUDFとして登録する。

  3. データフレームのカラムに対してUDFを適用する。

基本例

from pyspark.sql.functions import udf

# 1. Pythonの関数を定義
def format_name(name):
    return name.strip().title()

# 2. UDFとして登録
format_name_udf = udf(format_name, StringType())

# 3. データフレームに適用
df = spark.createDataFrame([(" alice ",), ("BOB",), (" Charlie  ",)], ["name"])
df = df.withColumn("formatted_name", format_name_udf(df["name"]))

df.show()

この例では、format_name 関数をUDFとして登録し、name 列に適用することで、名前の余分な空白を削除し、適切なフォーマット(先頭大文字)に変換しています。

ちょっと複雑な例

from pyspark.sql.functions import udf, col

# データ数の設定
num_rows = 5000

# 2024年のランダムな施策データの生成
campaign_data_2024 = [
    (random.randint(200, 300), f"Campaign_{i % 50}", f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}", round(random.uniform(10000, 50000), 2))
    for i in range(num_rows)
]
df_campaigns_2024 = spark.createDataFrame(campaign_data_2024, ["campaign_id", "campaign_name", "start_date", "budget"])

# 2024年のimpressionとclickデータの生成(CTRが最高でも10%設定)
imp_click_data_2024 = [
    (random.randint(200, 300), imp := random.randint(5000, 100000), random.randint(0, int(imp * 0.1)))
    for _ in range(num_rows)
]
df_imp_click_2024 = spark.createDataFrame(imp_click_data_2024, ["campaign_id", "impression", "click"])

# impressionとclickとの結合
df_campaign_effect = df_campaigns_2024.join(df_imp_click_2024, on="campaign_id", how="left")

# CTRの計算と追加
df_campaign_effect = df_campaign_effect.withColumn("CTR", (col("click") / col("impression")) * 100)

# 1. Pythonの関数を定義
def evaluate_campaign(ctr):
    if ctr > 7.0:
        return "High"
    elif ctr > 3.0:
        return "Medium"
    else:
        return "Low"

# 2. UDFとして登録する
evaluate_campaign_udf = udf(evaluate_campaign, StringType())

# 3. データフレームに適用
df_campaign_effect = df_campaign_effect.withColumn("performance", evaluate_campaign_udf(df_campaign_effect["CTR"]))

df_campaign_effect.show(5)

(結合編で紹介した施策データの転用をしています。。。)
データの作成後にCTRの算出を行い、UDFとして登録したevaluate_campaign関数で、ctrが7.0より大きければhighを値として、0.3より大きければmediumを値として、それ以外であればlowとしてperformanceカラムを作成しています。このようにUDFを登録することによって、より可読性の高いコードとなります。whenなどを使うことによってwithColumn()内で条件分岐を表現する方法もありますが、条件が複雑になったりする場合は、UDFの登録をすることによって可読性を高めることを意識しましょう。

終わりに

この問題集を通じて、PySparkにおけるUDFの基本的な操作についての理解を深めることができたでしょうか。
これでpysparkの記事は最後になるかと思いますが、何か面白いネタを思いついたら、都度書いていきますので、今後ともよろしくお願いいたします!

https://www.wantedly.com/companies/co-graph

この記事が気に入ったらサポートをしてみませんか?