CUSUM(累積和)手法を使った異常検出

CUSUMとは?

CUSUMは「累積和 (Cumulative Sum)」の略で、統計的プロセス制御手法の一つです。主に製造業などの品質管理や監視の分野で利用されます。

この記事では、CUSUM手法を使った異常検知を実施してみます。

データセット

CUSUMによって、どのように変化を検出するかを確認するために意図的に平均がシフトする区間を設けたデータセットを生成します。

import numpy as np

np.random.seed(42)
# 初期の平均
mu = 0 
sigma = 0.5
# データ点の数
num_points = 60
data = np.random.normal(mu, sigma, num_points)
# 20番目から40番目の観測まで平均を2だけ上昇
data[20:40] += 2  
# 40番目以降の観測から平均を3だけ下降
data[40:] -= 3 

グラフにプロットすると下記のように平均が始めに上がり、その後に下がるようなデータとなっています。

CUSUM

正の異常検知と負の異常検知をそれぞれ下記の式で計算します。

正の異常検知:$${S_{H {n+1}}​=max(0,S_{H_{n}}​+Z_{n+1}​-w))}$$
負の異常検知:$${S_{L {n+1}}​=max(0,S_{L_{n}}​-Z_{n+1}​-w))}$$

$${Z}$$は正規化された観測値です。
$${w}$$は変化検出の感度を調整するための閾値です。値を大きくすると、CUSUMの変化に対する感度が低下します。

# 正規化された観測値
Z = (data - mu) / sigma

# CUSUM
omega = 0.5 
threshold = 5 
S_H = np.zeros(num_points) 
S_L = np.zeros(num_points)  

for i in range(1, num_points):
    S_H[i] = max(0, S_H[i-1] + Z[i] - omega)
    S_L[i] = max(0, S_L[i-1] - Z[i] - omega)

# 異常検出点の抽出
anomalies_high = [i for i in range(num_points) if S_H[i] > threshold]
anomalies_low = [i for i in range(num_points) if S_L[i] > threshold]

可視化

検出した異常をグラフにプロットして可視化してみます。
異常が発生した観測値をそれぞれ赤(高異常)と緑(低異常)で表示しています。

plt.figure(figsize=(12, 8))
plt.subplot(311)
plt.plot(data, 'bo-', label='Data Points')
for i in anomalies_high:
    plt.plot(i, data[i], 'ro', markersize=10, label='High Anomaly' if i == anomalies_high[0] else "")
for i in anomalies_low:
    plt.plot(i, data[i], 'go', markersize=10, label='Low Anomaly' if i == anomalies_low[0] else "")
plt.title("Data Points with Detected Anomalies")
plt.xlabel("Observation")
plt.ylabel("Value")
plt.legend()

plt.subplot(312)
plt.plot(S_H, 'r-', label='High CUSUM ($S_H$)')
plt.axhline(y=threshold, color='green', linestyle='--', label="Threshold")
plt.title("High CUSUM ($S_H$)")
plt.xlabel("Observation")
plt.ylabel("$S_H$")
plt.legend()

plt.subplot(313)
plt.plot(S_L, 'g-', label='Low CUSUM ($S_L$)')
plt.axhline(y=threshold, color='red', linestyle='--', label="Threshold")
plt.title("Low CUSUM ($S_L$)")
plt.xlabel("Observation")
plt.ylabel("$S_L$")
plt.legend()

plt.tight_layout()
plt.show()

平均が上にシフトした時と下にシフトした時で、異常検知が行われ赤いマークで正の異常検知が、緑のマークで負の異常検知が行われている事が確認できました。