見出し画像

PySparkでデータの差分チェック

こんにちは。@yuki_saito_enです

この記事は Pythonその4 Advent Calendar 2020 24日目クリスマスイブの投稿記事です。

pythonはいろいろな用途で使えるのですが、私の利用用途の一つであるデータのチェックの観点での投稿になります。

データエンジニアな世界のpython

Java、pythonが多いですね。時間かかってもいいことが多いのでコンパイル言語よりこう入ったインタプリタな言語を選ぶことが多い気がします。

逆に、リアルタイムとかを求める場合はJavaであったりGo出会ったりすることが多いです。

データのチェック?

データなお仕事をしていると、AからBにデータを移動したり、並行稼働して別のデータのフォーマットに変換する時があります。

このようにデータが移動する時にデータの欠落によって、データ品質の劣化が発生してー>後続の機械学習や分析にて間違えた結果を出してしまうことの要因になってしまうことがあります。

その可能性を低くするために、少しでも打てる手は事前に打ちたいのものです。

チェック用のプログラム

環境はamazonのEMRです。

今回は単純にorcフォーマットのテーブルをparquetに変えてみた時のテストツールです。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import argparse
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


def main():
    
   #決まり文句
   spark = SparkSession.builder \
   .appName("activity") \
   .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
   .enableHiveSupport() \
   .getOrCreate()

   parser = argparse.ArgumentParser(description='args')
   parser.add_argument('--dt', type=str, required=True, help='show message')
   args = parser.parse_args()
  
  #orcのテーブルからデータを取得する
   sql_original="select * from hoge.peke_orc where dt='{dt}'".format(dt=args.dt)
   df_original=spark.sql(sql_original)
   
   #parquetのテーブルからデータを取得する
   sql_parq="select * from hoge.peke_parq where dt='{dt}'".format(dt=args.dt)
   df_parq=spark.sql(sql_parq)
   
   #データの型が一緒である
   print("dtype={dt}".format(dt=df_original.dtypes==df_parq.dtypes))
   print("dt={dt}".format(dt=args.dt))
   #データの件数が一緒である
   check=df_original.count()==df_parq.count()
   print("count={count}".format(count=check))

   if not check:
       print("originalcount={count}  paqrqtcount={count2}".format(count=df_original.count(),count2=df_parq.count()))
   
   if check:
       #ここで集合計算して、違うデータをあぶり出す。
       print("diff={diff}".format(diff=df_original.subtract(df_parq).first()))

   spark.stop()
if __name__ == '__main__':
   main()

単純に集合の計算をしているだけです。差分が多いと見るの大変なので最初の差分があった場合は、一件目だけ取得をしています。

orcからparquetに変換くらいなら起きないと思われる方もいらっしゃるかもしれないですが、少数表記が、指数表記に変わっていたり、こっそり一個だけ失敗していたり。なんてことが結構あります。

データの世界だとこのちょっとしたミスが、3ヶ月後などに発覚するともう大変です。バックフィルやら、気持ち的な面でやら平気で作業に1ヶ月くらい使ったりします。

サクッと作ってあとは、ぶん回しておけばいいので、作業時間はそんな取りません。後の1ヶ月より今の30分です。

これをやらない人は、いつもデータの障害を出しているイメージですが、きちんとやっている人は、障害を起こしていることをみたことがありません。

確認て大事だなーと思う一コマでした。

単純な例 以外はどうする??

ちょっとpythonと話がそれてしまうので、割愛しますが、ETLしたデータやデータマート的なものに関してはこのようなプログラムは通用しません。

そのため、データプロファイリングを使って炙り出していくことが重要です。

ただ、データプロファイリングのツール自体もpythonで自作することは可能なので興味のある方はデータプロファイリングでググってみるといいかと思います。

データ周りで、クリスマスプレゼントとして購入するなら、以下がおすすめです。

抽象的な本ですが、サービスを作るヒントになると思います。


以上 24日目クリスマスイブの投稿でした!

この記事が気に入ったらサポートをしてみませんか?