見出し画像

第一弾:Pyspark 問題集(基礎編)

こんにちは。コグラフSSD−2事業部の安山です。
データサイエンスの世界では、大規模なデータセットを扱う能力が不可欠です。その中核をなすのが、Apache SparkとそのPythonインターフェイスであるPySparkです。PySparkは、ビッグデータ処理における強力なツールであり、データ分析、機械学習、リアルタイムデータストリーミングなど、多岐にわたる用途に利用されています。
PySparkを使った具体的なデータ処理の例と解答例を示し、理解を深めることができるように設計しました。
アウトプットをしながら楽しんで取り組んでください!


環境

GoogleのColaboratoryで実行できるようになっています。

事前準備

csvファイルをダウンロードしてください。

インストール関連

!pip install pyspark
!pip install findspark
!pip install pyarrow
!pip install xlrd
!pip install openpyxl
!pip install chardet
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as fn
from pyspark.sql.functions import col, lit
import pandas as pd


csvファイルの判定

環境によっては文字化けを起こす可能性があるので、こちらを使用し判定して、読み込む際に活用してください。

import chardet    

def detect_encoding(file_path):
    with open(file_path, 'rb') as file:
     
        result = chardet.detect(file.read(10000))
        return result['encoding']

file_name = 'order_history.csv'  # CSVファイル名(別の階層にある場合はパス)
encoding = detect_encoding(file_name)
print(f"Detected encoding: {encoding}")

DataFrameの作成


member_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("members.csv")
product_sdf = spark.read.format("csv").option("header", "true").option("encoding", "utf-8").load("products.csv")
order_sdf = spark.read.format("csv").option("header", "true").option("encoding", "ascii").load("order_history.csv")

データの中身を確認

member_sdf.show()


このような出力結果になると思います。他の二つも試してみてください。

問題

問1 会員マスタからselectを使用してIDカラムを取得する

member_sdf.select("member_id").show()

問2 会員マスタから性別カラム、IDカラムの順に並ぶように五行抽出する

member_sdf.select("gender", "member_id").show(5)

selectは「,」で区切ることにより複数のカラムを指定することができます。
また、何行表示させるかはshow()の中で指定することができます。
※show()の引数で、「vertical = True」を指定すると縦横が逆になり、「truncate = False」を指定するとフィールド(エクセルでいうセル)内のデータが省略されずに表示されます。

問3 会員マスタ内に格納されている性別の種類を確認する

member_sdf.select("gender").distinct().show()

問4 商品マスタから食事を抽出する(1000円以上の商品が食事です)

product_sdf.filter(col("price").cast("int") >= 1000).show()

sdf.printSchema()をするとスキーマを確認することができます。priceのカラムはstring型になっているので、cast( )を使用してint型に変換します。
filter(条件文)で条件文に該当するレコードを取得することができます。

問5 会員マスタから20歳以上女性の会員を抽出する

member_sdf.filter(
    (col("age").cast("int") >= 20) & (col("gender") =="felame")
).show()

問6 飲み物(800以下の商品)の平均価格を算出する

product_sdf.filter(col("price") <=800).agg(fn.avg("price")).show()

まずは飲み物を商品マスタから抽出します。その後抽出したデータフレームに対して集計関数であるagg( )を使用して平均を算出します。
※ aggは、データ分析において頻繁に使用される関数で、特にApache Sparkやpandasなどのライブラリで見られます。aggは"aggregate"の略で、データを集約するために使用されます。これは、データセット全体または特定のキーに基づいてグループ化されたデータに対して、一つ以上の操作(例えば、合計、平均、最大、最小など)を適用することを可能にします。

問7 groupByを使用して、男女の会員数をそれぞれ算出する

member_sdf.groupBy("gender").agg(fn.count("member_id")).show()

groupByはPySparkや他のデータ処理フレームワークで使われる関数で、データを特定の列(または複数の列)に基づいてグループ化するために使います。この関数は、大量のデータを要約し、集約する際に特に役立ちます。

問8 注文履歴から商品ごとの注文数を集計する

order_sdf.groupBy("product_id").agg(
    fn.sum(col("quantity").cast("int")).alias("cnt"),
    ).select("product_id", "cnt").orderBy(col("product_id").cast("int")).show()

groupByを使用するタイミングは「〜ごとに」です。今回も「商品ごと」になのでgroupByを使用します。算出したいものは注文数なのでquantityを合計するsum( )を使用しますが、quantityもstring型になっているので、cast( )を使用してint型に変換します。alias は英語で「別名」の意味がありますが、alias( )を使用することで名称を指定することができます。

終わりに

この問題集を通じて、PySparkにおけるselectfiltergroupByなどの基本的な操作についての理解を深めることができたでしょうか。
第二弾も公開予定です!


この記事が気に入ったらサポートをしてみませんか?