見出し画像

マルチチャネル販売における在庫管理を Databricksで最適化してみた

はじめに

ジール採用チームです。先日スタートいたしましたnoteですが、是非ジールの技術面についても記事を掲載していきたいと考えています。

ジールはデータ活用に特化した事業を展開していますが、各部門がそれぞれ取り組んでいること、また、会社としても様々な新しい事業・サービス・ソリューションを積極的に取り入れており、幅広くご紹介します。

本日は、アプライドアナリティクス&インテリジェンスユニット(AAI)のAIエンジニアが取り組んでいる内容の一部をご紹介します。

AAIという組織では、AI等の先端領域の知見を持った人材が在籍しており、プロジェクト支援や、クライアントへのコンサルティング等を中心に、ジールがこれまで取り扱っていない領域や製品についての検証やソリューション開発を行っています。

今回は、市場からの注目度も高く、ジールも活用に力を入れているDatabricks(+Azure)について、身近な例を用いてご紹介します。

Databricksとは、データ・エンジニアリング、データ・サイエンス、ビジネスアナリティクスにまたがる統合データ分析プラットフォームで、
・構造化 & 非構造化データを統合して管理可能
・常時処理(ストリーミング) & 都度処理(バッチ処理)の処理プロセスを統合可能
等、様々な特徴があります。詳細は、こちらをご覧ください。
https://www.zdh.co.jp/products/databricks/

ジールでは、昨年パートナー契約を締結しています。
https://www.zdh.co.jp/topics/20201124/

それでは、記事をご覧ください。

画像9

1.1. 概要

パンデミックの影響で店舗での販売以外にオンラインで注文し店舗で受け取るなどのパターンが増えています。POSデータからリアルタイムでこういう販売のデータを把握できますが、店員が行う棚卸からのデータでは売れた数量だけでなく、故障、盗難や返品などのデータも取得することが出来ます。

POSデータは頻繫に更新されますが棚卸データは週次であることも多く、こういう頻度が違うデータを連携して最新の在庫データを維持する必要があります。

POSデータと棚卸データを結合することで在庫管理が週一回ではなく、よりリアルタイムになることによって、売上機会損失を防止、発注サイクルの最適化に役立ちます。

1.2. 構成図

構築する環境は以下の図通りです。

画像1

Azure IoTHubとは、Azureを提供するIoT機器、現場のデバイスをクラウドに登録、認証、管理できる、デバイスとクラウド間の信頼性の高い双方向メッセージングを提供出来るサービスです。Databricksとは、Databricks社が開発した統合データ分析プラットフォームです。

1.3. データフロー

画像2

今回の試みでは実際のPOSシステムがないため、POSデータと棚卸データをシミュレートするため疑似環境作成して、Databricks上でデータをファイルから読み込んでAzure上のAzure IoTHubとAzureストレージに配信します。

Databricksでデータをストリーミング+デルタテーブルを使って受け取ることで新たなデータを出てくる時に自動で更新されて最新の在庫データを示すことができるようになります。

1.4. 全体の流れ

●環境構築
● Databricksワークスペース作成
● Azure IoTHub作成
● Azure ストレージ作成
● 疑似環境作成
● データ結合・分析

1.5. 環境構築

1. Azure Databricksワークスペース作成

「リソース作成」から「Azure Databricks」を選択します。

画像3

サブスクリプション、リソースグループ、ワークスペース名を入力して、
リージョンと価格レベルを選択して、「確認および作成」をクリックして、作成をクリックします。

画像4

2. Azure IoTHubの作成

「リソース作成」から「IoT Hub」を検索して、IoT Hubを作成します。

AAI追加

サブスクリプション、リソースグループ、IoT Hub名を入力して、地域を選択して「確認および作成」を選択して、「作成」をクリックします。

画像6

3. ストレージアカウント作成

「リソース作成」から「Storage Account」を検索して、「作成」をクリックします。

画像7

必要な情報を入力して、「確認および作成」をクリックして、「作成」をクリックします。

画像8

1.6. 疑似環境作成

POSデータと棚卸データの送信をシミュレートするため、下記通りDatabricksでデータをファイルからを読み込んで、POSデータをストリーミングデータとしてAzure IoTHubへ送信し、店員の棚卸しデータをストレージに保存します。

サンプルデータファイルをストレージアカウントのコンテナーに保存して、Databricksからアクセスします。

データフローの「1-1」と「1-2」を実装します。

●必要なライブラリをインストールする

from pyspark.sql.types import *
import pyspark.sql.functions as f

import datetime, time

from azure.iot.device import IoTHubDeviceClient
from azure.storage.blob import BlobServiceClient
from delta.tables import *

● 販売における在庫変更と棚卸しでの在庫を管理するスキーマを定義します。

inventory_change_schema = StructType([
StructField('trans_id', StringType()),
StructField('item_id', IntegerType()),  
StructField('store_id', IntegerType()),
StructField('date_time', TimestampType()),
StructField('quantity', IntegerType()),
StructField('change_type_id', IntegerType())
])

inventory_snapshot_schema = StructType([
StructField('item_id', IntegerType()),
StructField('employee_id', IntegerType()),
StructField('store_id', IntegerType()),
StructField('date_time', TimestampType()),
StructField('quantity', IntegerType())
])

●店舗での販売とオンラインでの販売取引データを読み込みます。

inventory_change_files = [
config['inventory_change_store001_filename'],
config['inventory_change_online_filename']
]

inventory_change = (
spark
  .read
  .csv(
    inventory_change_files, 
    header=True, 
    schema=inventory_change_schema, 
    timestampFormat='yyyy-MM-dd HH:mm:ss'
    )
  .withColumn('trans_id', f.expr('substring(trans_id, 2, length(trans_id)-2)')) 
  .withColumn('item', f.struct('item_id', 'quantity')) 
  .groupBy('date_time','trans_id')
    .agg(
      f.first('store_id').alias('store_id'),
      f.first('change_type_id').alias('change_type_id'),
      f.collect_list('item').alias('items')  
      )
  .orderBy('date_time','trans_id')
  .toJSON()
  .collect()
)

● 棚卸しデータを読み込みます。

inventory_snapshot_files = [ 
config['inventory_snapshot_store001_filename'],
config['inventory_snapshot_online_filename']
]

inventory_snapshots = (
spark
  .read
  .csv(
    inventory_snapshot_files, 
    header=True, 
    timestampFormat='yyyy-MM-dd HH:mm:ss.SSSXXX', 
    schema=inventory_snapshot_schema
    )
)

● 取引データを送信するためAzureIoT Hubに接続します。

if 'client' in locals():
try:
  client.disconnect()
except:
  pass

client = IoTHubDeviceClient.create_from_connection_string( config['iot_device_connection_string'] )
client.connect()

●Azure IoT Hubのイベントメッセージの上限が256KBですので、最大値を設定する。取引データがこれを超える場合、データを分割して送信する。

event_speed_factor = 10 
max_msg_size = 256 * 1024 

データフロー図の「2-1」と「2-2」を実施します。

● データを送信します。取引データをAzure IoT HubへストリーミングデータとしてIoTHubへ送信しますが、棚卸しデータをストレージで保存します。

last_dt = None
for event in inventory_change:
d = eval(event)
dt = datetime.datetime.strptime( d['date_time'], '%Y-%m-%dT%H:%M:%S.000Z')

for snapshot_dt, store_id in inventory_snapshot_times_for_loop:  
  if dt < snapshot_dt:
    break
  else:
    snapshot_pd = (
      inventory_snapshots
        .filter(f.expr("store_id={0} AND date_time='{1}'".format(store_id, snapshot_dt)))
        .withColumn('date_time', f.expr("date_format(date_time, 'yyyy-MM-dd HH:mm:ss')"))
        .toPandas()  
         )
    blob_service_client = BlobServiceClient.from_connection_string(config['storage_connection_string'])
    blob_client = blob_service_client.get_blob_client(
      container=config['storage_container_name'], 
      blob=(config['inventory_snapshot_path'].replace(config['dbfs_mount_name'],'')[1:]+
            'inventory_snapshot_{0}_{1}'.format(store_id,snapshot_dt.strftime('%Y-%m-%d %H:%M:%S')))
      )
    blob_client.upload_blob(str(snapshot_pd.to_csv()), overwrite=True)
    blob_client.close()
    inventory_snapshot_times.pop(0)
    
if last_dt is None: last_dt = dt
delay = (dt - last_dt).seconds
delay = int(delay/event_speed_factor)
if delay < 0: delay = 0 
time.sleep(delay) 
if len(event) > max_msg_size:
  items = d.pop('items')
  for i, item in enumerate(items):
    d['items'] = item
    client.send_message(str(d))
    if (i+1)%25==0:
      time.sleep(1)
else:
  client.send_message(event)
last_dt = dt

1.7. データ結合・分析

● Azure IoT Hubで受け取るデータを読み込みます。

eventhub_config = {}
eventhub_config['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(config['event_hub_connection_string'])
eventhub_config['maxEventsPerTrigger'] = (100 * sc.defaultParallelism)

raw_inventory_change = (
spark
  .readStream
  .format('eventhubs')
  .options(**eventhub_config)
  .load()
  .withColumn('body', f.expr('cast(body as string)'))
)

● 読み込んだ取引データを、定義するスキーマで交換します。

transaction_schema = StructType([
StructField('trans_id', StringType()),
StructField('store_id', IntegerType()),
StructField('date_time', TimestampType()),
StructField('change_type_id', IntegerType()),
StructField('items', ArrayType(
  StructType([
    StructField('item_id', IntegerType()), 
    StructField('quantity', IntegerType())
    ])
  ))
])

parsed_inventory_change = (
raw_inventory_change
  .withColumn('event', f.from_json('body', transaction_schema))
  .select(
    f.col('event.trans_id').alias('trans_id'),
    f.col('event.store_id').alias('store_id'), 
    f.col('event.date_time').alias('date_time'), 
    f.col('event.change_type_id').alias('change_type_id'), 
    f.explode('event.items').alias('item') 
    )
  .withColumn('item_id', f.col('item.item_id'))
  .withColumn('quantity', f.col('item.quantity'))
  .drop('item')
  .withWatermark('date_time', '1 hour') 
  .dropDuplicates(['trans_id','item_id']) 
)

● 変換したデータをDelta形で保存します。

_ = (
parsed_inventory_change
  .writeStream
  .format('delta')
  .outputMode('append')
  .option('checkpointLocation',config['inventory_change_checkpoint_path'])
  .table('pos.inventory_change')
  )

● 棚卸しデータをストレージから読み込んで、デルタ形に変換して保存します。

inventory_snapshot_schema = StructType([
StructField('id', IntegerType()),
StructField('item_id', IntegerType()),
StructField('employee_id', IntegerType()),
StructField('store_id', IntegerType()),
StructField('date_time', TimestampType()),
StructField('quantity', IntegerType())
])

raw_inventory_snapshot = (
spark
  .readStream
  .format('cloudFiles')  # auto loader
  .option('cloudFiles.format', 'csv')
  .option('cloudFiles.includeExistingFiles', 'true') 
  .option('header', 'true')
  .schema(inventory_snapshot_schema)
  .load(config['inventory_snapshot_path'])
  .drop('id')
)

_ = (
raw_inventory_snapshot
  .writeStream
  .format('delta')
  .outputMode('append')
  .option('checkpointLocation', config['inventory_snapshot_checkpoint_path'])
  .table('pos.inventory_snapshot')
  )

●最新の棚卸しからの在庫データを管理するためのDeltaテーブルを作成します。ストレージで新たな棚卸しデータを出てくる時に棚卸し在庫データをマイクロバッチで更新します。最新棚卸在庫テーブル定義します。

%sql

create table if not exists pos.latest_inventory_snapshot (
item_id int,
employee_id int,
store_id int,
date_time timestamp,
quantity int
)
using delta;

● 新たな棚卸データをマイクロバッチで更新する機能を作成します。

snapshot_target = DeltaTable.forName(spark, 'pos.latest_inventory_snapshot')
def upsertToDelta(microBatchOutputDF, batchId):
( 
  snapshot_target.alias('t')
    .merge(
      microBatchOutputDF.alias('s'),
      's.store_id=t.store_id AND s.item_id=t.item_id AND s.date_time<=t.date_time'
      )
  .whenMatchedUpdateAll() 
  .whenNotMatchedInsertAll()
  .execute()
  )

( 
  microBatchOutputDF
    .selectExpr(
      'NULL as trans_id', 
      'store_id', 
      'date_time',
      '-1 as change_type_id',
      'item_id',
      '0 as quantity'
      )
    .write
    .format('delta')
    .mode('append')
    .saveAsTable('pos.inventory_change')
  )

● マイクロバッチ在庫データを更新するストリーミングを開始します。

_ = (
raw_inventory_snapshot
  .writeStream
  .format('delta')
  .foreachBatch(upsertToDelta)
  .outputMode('update')
  .start()
)

● 取引データと棚卸データを加えて最新在庫データを作成します。

inventory_change = (
spark.table('pos.inventory_change').alias('x')
  .join(spark.table('pos.store').alias('y'), on='store_id')
  .join(spark.table('pos.inventory_change_type').alias('z'), on='change_type_id')
  .filter(f.expr("NOT(y.name='online' AND z.change_type='bopis')"))
  .select('store_id','item_id','date_time','quantity')
)

inventory_snapshot = spark.table('pos.latest_inventory_snapshot')
inventory_current = (
inventory_snapshot.alias('a')
  .join(
    inventory_change.alias('b'), 
    on=f.expr('''
      a.store_id=b.store_id AND 
      a.item_id=b.item_id AND 
      a.date_time<=b.date_time
      '''), 
    how='leftouter'
    )
  .groupBy('a.store_id','a.item_id')
    .agg(
      f.first('a.quantity').alias('snapshot_quantity'),
      f.sum('b.quantity').alias('change_quantity'),
      f.first('a.date_time').alias('snapshot_datetime'),
      f.max('b.date_time').alias('change_datetime')
      )
  .withColumn('change_quantity', f.coalesce('change_quantity', f.lit(0)))
  .withColumn('current_quantity', f.expr('snapshot_quantity + change_quantity'))
  .withColumn('date_time', f.expr('GREATEST(snapshot_datetime, change_datetime)'))
  .drop('snapshot_datetime','change_datetime')
  .orderBy('current_quantity')
)

1.8. まとめ

今回はリアルタイムデータの仕組みが無いため、疑似環境を用意してデモを作りましたが、リアルタイムデータとバッチデータをStrucuted StreamingでDatabricksのdeltaテーブルに格納して、異なる頻度のデータを1つのテーブルとして分析、活用するのを非常に簡単に行うことができました。

ジールでは、今回ご紹介した在庫適正化というテーマを含め、
容易に需要予測・予知保全・在庫適正化を実現するソリューションの提供を目指しています。
https://www.zdh.co.jp/topics/20210611/

今回は、ほんの一例を記載しましたが、AI・ディープラーニングなどの先端技術の活用が難しかったお客様についても、デジタルトランスフォーメーションを実現するソリューションの提供をジールは目指しております。



この記事が気に入ったらサポートをしてみませんか?