[글또] SMOTE
개요
분류 문제에서 데이터의 레이블이 특정 클래스에 몰려 있는 경우, 우리는 그 데이터가 불균형(imbalanced)하다고 합니다. 현업에서 축적되는 데이터의 대다수에서 불균형이 관찰됩니다. 이진 분류 문제에서 우리가 원하는 것은 관심 있는 클래스의 데이터, 대개 일반적이지 않은(abnormal) 데이터 검출입니다. 불량, 이탈, 승진, 양성 등 샘플은 전체에서 소수일 가능성이 높고 자연스레 데이터셋은 불균형하게 됩니다.
불균형 데이터를 처리하는 방법으로는 크게 아래 세 가지를 들 수 있습니다.
- Under-sampling
- Over-sampling
- 임곗값 이동(Threshold moving)
언더 샘플링은 다수(major) 샘플을 솎아 내는 방식이고 오버 샘플링은 소수(minor) 샘플을 불리는 작업입니다. 각각 여러 세부 기법들이 존재하는데 그에 대한 설명은 데이터 사이언스 스쿨에서 확인하실 수 있습니다. ROC 곡선 상 FPR(False positive rate)과 TPR(True positive rate. sensitivity 혹은 recall로도 불림)의 균형을 맞추는 방향으로 임곗값을 조정하기도 하는데 이 역시 매우 다양한 접근법이 있습니다.
- 주의: 언더 샘플링이나 오버 샘플링은 학습 데이터에만 적용합니다. 당연하게도 validation 혹은 test 데이터엔 쓰지 않습니다.
이번 포스트에서는 오버 샘플링의 한 종류인 SMOTE(Synthetic Minority Over-sampling Technique) 기법을 구현해보며 해당 알고리즘의 작동 원리에 대해 살펴보겠습니다.
필요한 라이브러리와 데이터 불러오기
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.decomposition import PCA
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score, roc_curve
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from imblearn.over_sampling import SMOTE
테스트를 위해 이진 분류 데이터셋을 생성합니다. 샘플 개수는 1,000개이며 class 0 : class 1의 비율은 9:1로 설정했습니다.
data = make_classification(n_features=4, n_samples=1000, weights=[0.9], random_state= 42, )
X_train, X_test, y_train, y_test = train_test_split(data[0], data[1], test_size = .2, random_state=42,)
불균형 측정하기
불균형의 정도를 어떻게 측정할 수 있을까요? StackExchange 답변을 참조하면, Shannon entropy를 이용해 데이터가 얼마만큼 불균형한지 수치로 판단해볼 수 있습니다. 전체 샘플 개수를 $n$, 클래스의 개수를 $k$, 각 클래스에 속한 샘플의 개수를 $c_i$라 하면, 엔트로피는 아래와 같이 계산됩니다.
$\displaystyle H = -\sum_{i=0}^{k-1} \frac{c_i}{n} \log{\frac{c_i}{n}}$
완전한 균형 상태인 경우 엔트로피는 $\log{k}$가 되며 완전한 불균형 즉 하나의 클래스만 존재하면 엔트로피는 0이 됩니다. 위 식을 $\log{k}$로 나눠주면 데이터 균형의 정도를 0과 1 사이의 값으로 나타낼 수 있습니다. 이를 코드로 구현하면 아래와 같습니다.
# https://stats.stackexchange.com/a/239982
def balance(seq):
n = len(seq)
classes, counts = np.unique(seq, return_counts=True)
k = len(classes)
H = -sum([ (count/n) * np.log((count/n)) for count in counts])
return H/np.log(k)
위에서 생성된 학습 데이터의 불균형 정도를 계산해봅시다.
print('Balance of train set:', balance(y_train))
Balance of train set: 0.46502061726394456
SMOTE
SMOTE는 대표적인 오버 샘플링 기법 가운데 하나입니다. 컨셉은 간단합니다.
- 소수(minor) 클래스 샘플 하나의 최근접 이웃(the nearest neighbors) k개를 선택합니다.
- 샘플과 각 이웃을 잇는 선 위의 점들 중 하나를 임의로 골라 소수 클래스의 샘플로 설정합니다.
- 불균형이 해소될 때까지 각 소수 클래스 샘플마다 위 작업을 반복합니다.
논문 SMOTE(Chawla et al. 2002)에 있는 pseudo-code를 참고하여 SMOTE 알고리즘을 아래와 같이 구현했습니다.
- 이진 분류 데이터를 가정하고 작성했습니다. 가장 소수의 클래스 샘플 개수를 가장 다수의 클래스 샘플 개수와 같게 맞춥니다.
k_neighbors
: 최근접 이웃의 개수random_state
: 최근접 이웃을 고르거나 소수 클래스 샘플과 그 이웃 사이의 점 중 하나를 선택할 때 쓰이는 수synthetic
: 알고리즘에 의해 새로이 생성, 추가된 소수 클래스 샘플들_get_nearest
: 샘플 데이터(행렬) i-th 행의 최근접(유클리드 거리 기준) 이웃 k개 행의 indices를 리턴하는 함수_populate
: 소수 클래스 샘플 하나와 그 이웃 샘플을 잇는 선 위의 점 가운데 하나를 골라 소수 클래스에 추가하는 함수fit
: 오버 샘플링을 수행합니다. 즉synthetic
matrix를 생성합니다.resample
: 실제 데이터와 생성된 소수 클래스 샘플들을 합치고 무작위 shuffle한 후 리턴합니다.
from sklearn.neighbors import BallTree
class smote:
def __init__(self, k_neighbors=5, random_state=None):
self.k_neighbors = k_neighbors
self.random_state = random_state
# Get the indices of the k(=k_neighbors) nearest neighbors of the idx-th row in the given array
def _get_nearest(self, array, idx):
tree = BallTree(array, leaf_size=2)
_, indices = tree.query(array[idx:idx+1], k = self.k_neighbors+1)
return indices.flatten()[1:]
# Generate the synthetic samples for each minority class sample and stack them
def _populate(self, n, i, nnarr, ):
rng = np.random.default_rng(self.random_state + i)
nn = rng.integers(0, self.k_neighbors, n)
Gap = rng.random(n)
for e in range(n):
dif = self.sample[nnarr[nn[e]], :] - self.sample[i, :]
gap = np.tile(Gap[e], self.numattrs)
self.synthetic = np.vstack((self.synthetic, self.sample[i, :] + dif * gap))
def _populate_last(self, n, i, nnarr, ):
rng = np.random.default_rng(self.random_state + i)
nn = rng.integers(0, self.k_neighbors, n)
Gap = rng.random(n)
for e in range(n):
dif = self.sample[nnarr[nn[e]], :] - self.sample[i%self.sample.shape[0], :]
gap = np.tile(Gap[e], self.numattrs)
self.synthetic = np.vstack((self.synthetic, self.sample[i%self.sample.shape[0], :] + dif * gap))
# Fit the SMOTE instance with an imbalanced dataset
def fit(self, X, y, return_synthetic = False):
y = y.flatten()
cls_major = np.unique(y, return_counts=True)[0][0]
cls_minor = np.unique(y, return_counts=True)[0][-1]
num_major = max(np.unique(y, return_counts=True)[1])
num_minor = min(np.unique(y, return_counts=True)[1])
# sample: Minority class samples
# N: the number of synthetic samples generated for each minority class sample
# numattrs: the number of columns i.e. features
# synthetic: an empty array in which the generated synthetic samples will be stacked
self.sample = X[np.where(y==cls_minor)[0]]
self.N = (num_major // num_minor) - 1
self.numattrs = X.shape[1]
self.synthetic = np.array([]).reshape(-1, self.numattrs)
for ind in range(self.sample.shape[0]+1):
if ind==self.sample.shape[0]:
nnarray = self._get_nearest(array = self.sample, idx = 0)
self._populate_last(num_major- num_minor - self.synthetic.shape[0], ind, nnarray)
else:
nnarray = self._get_nearest(array = self.sample, idx= ind)
self._populate(self.N, ind, nnarray)
self.synthetic_y = np.tile(cls_minor, self.synthetic.shape[0])
if return_synthetic == True:
return self.synthetic, self.synthetic_y
else:
pass
# Get the synthetic features and label only
def get_symthetic(self):
try:
return self.synthetic, self.synthetic_y
except: raise AttributeError('Please fit first.') from None
# Resample the dataset.
def resample(self, X, y):
try:
resample = np.hstack((np.vstack((X, self.synthetic)), np.append(y, self.synthetic_y).reshape(-1, 1) ))
rng = np.random.default_rng(self.random_state)
rng.shuffle(resample)
return resample[:, :-1], resample[:, -1]
# Raise AttributeError if dataset were not fitted.
except: raise AttributeError('Please fit first.') from None
imbalanced-learn 라이브러리의 SMOTE와 비교함으로써 구현된 코드를 검증해보겠습니다. 먼저 랜덤포레스트 분류 모델을 선언합니다.
model_rf = RandomForestClassifier(n_estimators=100,random_state=42)
오버 샘플링되지 않은, 실제 학습 데이터를 train하고 테스트 데이터를 예측했을 때 성능은 아래와 같습니다.
model_rf.fit(X_train, y_train)
pred_rf = model_rf.predict(X_test)
print('Accuracy:', model_rf.score(X_test, y_test))
print('Precision score:', precision_score(y_test, pred_rf))
print('Recall score:', recall_score(y_test, pred_rf))
print('F1 score:', f1_score(y_test, pred_rf))
print('AUC:', roc_auc_score(y_test, model_rf.predict_proba(X_test)[:, 1] ))
Accuracy: 0.955
Precision score: 1.0
Recall score: 0.64
F1 score: 0.7804878048780487
AUC: 0.9488
라이브러리를 사용한 결과입니다. resample된 학습 데이터 각 클래스의 샘플 개수는 동일하며 모델은 불균형이 해소된 데이터를 학습합니다. 위와 동일한 threshold value임에도 F1 score와 AUC가 상승한 것을 확인할 수 있습니다.
X_samp, y_samp = SMOTE(k_neighbors=3, random_state=42).fit_resample(X_train, y_train)
classes, counts = np.unique(y_samp, return_counts=True)
model_rf.fit(X_samp, y_samp)
pred_rf = model_rf.predict(X_test)
print('Re-sampled data:', dict(zip(['# of class {}'.format(int(i)) for i in classes], counts)))
print('Accuracy:', model_rf.score(X_test, y_test))
print('Precision score:', precision_score(y_test, pred_rf))
print('Recall score:', recall_score(y_test, pred_rf))
print('F1 score:', f1_score(y_test, pred_rf))
print('AUC:', roc_auc_score(y_test, model_rf.predict_proba(X_test)[:, 1] ))
Re-sampled data: {'# of class 0': 721, '# of class 1': 721}
Accuracy: 0.965
Precision score: 0.9090909090909091
Recall score: 0.8
F1 score: 0.8510638297872342
AUC: 0.9538285714285714
끝으로 제가 구현한 코드로 도출된 결과입니다. 마찬가지로 데이터 불균형이 해소되었고 성능이 개선되었습니다.
sm = smote(k_neighbors=3, random_state=42)
sm.fit(X_train, y_train)
X_samp, y_samp = sm.resample(X_train, y_train)
classes, counts = np.unique(y_samp, return_counts=True)
model_rf.fit(X_samp, y_samp)
pred_rf = model_rf.predict(X_test)
print('Re-sampled data:', dict(zip(['# of class {}'.format(int(i)) for i in classes], counts)))
print('Accuracy:', model_rf.score(X_test, y_test))
print('Precision score:', precision_score(y_test, pred_rf))
print('Recall score:', recall_score(y_test, pred_rf))
print('F1 score:', f1_score(y_test, pred_rf))
print('AUC:', roc_auc_score(y_test, model_rf.predict_proba(X_test)[:, 1] ))
Re-sampled data: {'# of class 0': 721, '# of class 1': 721}
Accuracy: 0.965
Precision score: 0.9090909090909091
Recall score: 0.8
F1 score: 0.8510638297872342
AUC: 0.9704
시각화
마지막으로 SMOTE로 추가된 샘플들을 시각화해보겠습니다. 샘플들을 2차원으로 만든 후 평면에 나타냈습니다. 앞서 설명한 바와 같이, 두 소수 클래스 샘플(적색 큰 점)을 잇는 선 위에 SMOTE로 생성 및 추가된 소수 샘플(적색 작은 점)이 관찰됩니다.
X_syn, y_syn = sm.get_symthetic()
X_df = pd.DataFrame(np.vstack((X_train, X_syn)), columns = ['feat_{}'.format(str(i).zfill(2)) for i in range(data[0].shape[1])])
X_df['class'] = ['class: {}'.format(i) for i in y_train.tolist()] + ['class: 1(SMOTE)']*len(y_syn)
pca = PCA(n_components=2, random_state=42)
scaler = StandardScaler()
xpca = pca.fit_transform(scaler.fit_transform(X_df.values[:, :-1]))
fig, ax = plt.subplots(1,1, figsize = (20,12))
ax.set_aspect(1)
ax.axline((.0, .0), (pca.transform(pca.components_)[0,0], pca.transform(pca.components_)[0,1]), c='k', label='1st component')
ax.axline((.0, .0), (pca.transform(pca.components_)[1,0], pca.transform(pca.components_)[1,1]), c='navy', label='2nd component')
sns.scatterplot(x = xpca[:,0], y=xpca[:,1], hue=X_df.values[:, -1], palette={'class: 0':'green', 'class: 1':'red', 'class: 1(SMOTE)':'orangered'}, size=X_df.values[:, -1],
sizes={'class: 0':40, 'class: 1':60, 'class: 1(SMOTE)':20}, alpha=.7 )
plt.title('PCA of the training dataset: standardized', fontsize = 16)
plt.show()
Leave a comment