来源:宅码 本文约7100字,建议阅读10+分钟 本文收集整理了公开网络上一些常见的异常检测方法(附资料来源和代码)。
def three_sigma(s):
mu, std = np.mean(s), np.std(s)
lower, upper = mu-3*std, mu+3*std
return lower, upper
def z_score(s): z_score = (s - np.mean(s)) / np.std(s) return z_score br
def boxplot(s):
q1, q3 = s.quantile(.25), s.quantile(.75)
iqr = q3 - q1
lower, upper = q1 - 1.5*iqr, q3 + 1.5*iqr
return lower, upper
from outliers import smirnov_grubbs as grubbs
print(grubbs.test([8, 9, 10, 1, 9], alpha=0.05))
print(grubbs.min_test_outliers([8, 9, 10, 1, 9], alpha=0.05))
print(grubbs.max_test_outliers([8, 9, 10, 1, 9], alpha=0.05))
print(grubbs.max_test_indices([8, 9, 10, 50, 9], alpha=0.05))
from pyod.models.knn import KNN
# 初始化检测器clf
clf = KNN( method='mean', n_neighbors=3, )
clf.fit(X_train)
# 返回训练数据上的分类标签 (0: 正常值, 1: 异常值)
y_train_pred = clf.labels_
# 返回训练数据上的异常值 (分值越大越异常)
y_train_scores = clf.decision_scores_
数据点P的局部相对密度(局部异常因子)=点P邻域内点的平均局部可达密度跟数据点P的局部可达密度的比值:
数据点P的局部可达密度=P最近邻的平均可达距离的倒数。距离越大,密度越小。
点P到点O的第k可达距离=max(点O的k近邻距离,点P到点O的距离)。
from sklearn.neighbors import LocalOutlierFactor as LOF
X = [[-1.1], [0.2], [100.1], [0.3]]
clf = LOF(n_neighbors=2)
res = clf.fit_predict(X)
print(res)
print(clf.negative_outlier_factor_)
# https://zhuanlan.zhihu.com/p/
from pyod.models.cof import COF
cof = COF(contamination = 0.06, 异常值所占的比例
n_neighbors = 20, 近邻数量
)
cof_label = cof.fit_predict(iris.values) # 鸢尾花数据
print("检测出的异常值数量为:",np.sum(cof_label == 1))
# Ref: https://github.com/jeroenjanssens/scikit-sos
import pandas as pd
from sksos import SOS
iris = pd.read_csv("http://bit.ly/iris-csv")
X = iris.drop("Name", axis=1).values
detector = SOS()
iris["score"] = detector.predict(X)
iris.sort_values("score", ascending=False).head(10)
# Ref: https://zhuanlan.zhihu.com/p/
from sklearn.cluster import DBSCAN
import numpy as np
X = np.array([[1, 2], [2, 2], [2, 3],
[8, 7], [8, 8], [25, 80]])
clustering = DBSCAN(eps=3, min_samples=2).fit(X)
clustering.labels_
array([ 0, 0, 0, 1, 1, -1])
# 0,,0,,0:表示前三个样本被分为了一个群
# 1, 1:中间两个被分为一个群
# -1:最后一个为异常点,不属于任何一个群
# Ref:https://zhuanlan.zhihu.com/p/
from sklearn.datasets import load_iris
from sklearn.ensemble import IsolationForest
data = load_iris(as_frame=True)
X,y = data.data,data.target
df = data.frame
# 模型训练
iforest = IsolationForest(n_estimators=100, max_samples='auto',
contamination=0.05, max_features=4,
bootstrap=False, n_jobs=-1, random_state=1)
# fit_predict 函数 训练和预测一起 可以得到模型是否异常的判断,-1为异常,1为正常
df['label'] = iforest.fit_predict(X)
# 预测 decision_function 可以得出 异常评分
df['scores'] = iforest.decision_function(X)
# Ref: https://zhuanlan.zhihu.com/p/
from sklearn.decomposition import PCA
pca = PCA()
pca.fit(centered_training_data)
transformed_data = pca.transform(training_data)
y = transformed_data
# 计算异常分数
lambdas = pca.singular_values_
M = ((y*y)/lambdas)
# 前k个特征向量和后r个特征向量
q = 5
print "Explained variance by first q terms: ", sum(pca.explained_variance_ratio_[:q])
q_values = list(pca.singular_values_ < .2)
r = q_values.index(True)
# 对每个样本点进行距离求和的计算
major_components = M[:,range(q)]
minor_components = M[:,range(r, len(features))]
major_components = np.sum(major_components, axis=1)
minor_components = np.sum(minor_components, axis=1)
# 人为设定c1、c2阈值
components = pd.DataFrame({'major_components': major_components,
'minor_components': minor_components})
c1 = components.quantile(0.99)['major_components']
c2 = components.quantile(0.99)['minor_components']
# 制作分类器
def classifier(major_components, minor_components):
major = major_components > c1
minor = minor_components > c2
return np.logical_or(major,minor)
results = classifier(major_components=major_components, minor_components=minor_components)
# Ref: https://zhuanlan.zhihu.com/p/
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
# 标准化数据
scaler = preprocessing.MinMaxScaler()
X_train = pd.DataFrame(scaler.fit_transform(dataset_train),
columns=dataset_train.columns,
index=dataset_train.index)
# Random shuffle training data
1) =
X_test = pd.DataFrame(scaler.transform(dataset_test),
columns=dataset_test.columns,
index=dataset_test.index)
tf.random.set_seed(10)
act_func = 'relu'
# Input layer:
model=Sequential()
# First hidden layer, connected to input vector X.
act_func, =
kernel_initializer='glorot_uniform',
kernel_regularizer=regularizers.l2(0.0),
input_shape=(X_train.shape[1],)
)
)
act_func, =
kernel_initializer='glorot_uniform'))
act_func, =
kernel_initializer='glorot_uniform'))
model.add(Dense(X_train.shape[1],
kernel_initializer='glorot_uniform'))
'mse',optimizer='adam') =
print(model.summary())
# Train model for 100 epochs, batch size of 10:
NUM_EPOCHS=100
BATCH_SIZE=10
history=model.fit(np.array(X_train),np.array(X_train),
batch_size=BATCH_SIZE,
epochs=NUM_EPOCHS,
validation_split=0.05,
verbose = 1)
plt.plot(history.history['loss'],
'b',
label='Training loss')
plt.plot(history.history['val_loss'],
'r',
label='Validation loss')
'upper right') =
plt.xlabel('Epochs')
[mse]')
plt.ylim([0,.1])
plt.show()
# 查看训练集还原的误差分布如何,以便制定正常的误差分布范围
X_pred = model.predict(np.array(X_train))
X_pred = pd.DataFrame(X_pred,
columns=X_train.columns)
X_train.index =
scored = pd.DataFrame(index=X_train.index)
np.mean(np.abs(X_pred-X_train), axis = 1) =
plt.figure()
sns.distplot(scored['Loss_mae'],
bins = 10,
kde= True,
color = 'blue')
plt.xlim([0.0,.5])
# 误差阈值比对,找出异常值
X_pred = model.predict(np.array(X_test))
X_pred = pd.DataFrame(X_pred,
columns=X_test.columns)
X_test.index =
threshod = 0.3
scored = pd.DataFrame(index=X_test.index)
np.mean(np.abs(X_pred-X_test), axis = 1) =
threshod =
scored['Loss_mae'] > scored['Threshold'] =
scored.head()
from sklearn import svm
# fit the model
clf = svm.OneClassSVM(nu=0.1, kernel='rbf', gamma=0.1)
clf.fit(X)
y_pred = clf.predict(X)
n_error_outlier = y_pred[y_pred == -1].size
版权声明:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权、违法违规、事实不符,请将相关资料发送至xkadmin@xkablog.com进行投诉反馈,一经查实,立即处理!
转载请注明出处,原文链接:https://www.xkablog.com/te-aq/19528.html