redhat9i 发表于 2022-6-28 08:00

【数据挖掘】基于卷积神经网络的非侵入式负荷分解(NILM)Python实现

本方法主要利用基于卷积神经网络的非侵入式负荷分解方法实现住宅设备的识别,输入数据为在设备运行时获得的瞬态功率信号数据。训练卷积神经网络使用数据为开源数据REDD(1Hz),具体实现原理请参考文献下载链接。只供学习参考,Python实现代码如下:
1 第一部分:数据可视化
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
data = pd.read_csv("data.csv")
#print(data)

n = int(data.max(axis = 0, skipna = True)) + 1# gets the number of readings
h = int(data.max(axis = 0, skipna = True)) # gets the number of houses

houses = np.zeros((n,5,h))# time, tv inst power, aggr power, filtered tv , on/off

for i in range(h):
    houses[:,0:3,i] =data==i+1].values[:,1:]   # data is now seperated by house

# visulise data
plt.figure()
plt.suptitle('Tv power and agg power seperated ')
plt.subplot(211)
for i in range(h):
    plt.plot(houses[:,0,i],houses[:,1,i], label = 'House ' + str(i+1) )
plt.subplot(212)
for i in range(h):
    plt.plot(houses[:,0,i],houses[:,2,i], label = 'House ' + str(i+1) )
plt.legend()
# plt.show()

plt.figure()
plt.suptitle('Normalised data, shown by house')
for i in range(h):
    plt.subplot(3,1,i+1)
    plt.plot(houses[:,0,i],houses[:,1,i]/np.average(houses[:,1,i]) )
    plt.plot(houses[:, 0, i], houses[:, 2, i]/np.average( houses[:, 2, i]) )
plt.legend()
plt.show()实验结果:




过滤并标准化电视即时功率
from scipy.signal import *
houses[:,3,:] = savgol_filter(houses[:, 1, :] / np.average(houses[:, 1, :],axis=0), 11, 2,axis=0)
thres = 1.15# the threshold for seperation
plt.figure()
plt.suptitle('Filtered inst power, and determined state')
for i in range(h):
    houses[ np.argwhere( houses[:,3,i] >= thres ) ,4,i] +=1   # makes the 4th row qual to 1 if the filtered result is higher then the threshold
    plt.subplot(3, 1, i + 1)
    plt.plot(houses[:, 0, i],houses[:, 3, i]) # plot filtered curve
    plt.plot(houses[:, 0, i], houses[:, 4, i]) # plot on state
    plt.plot(houses[:, 0, i], houses[:, 1, i] / np.average(houses[:, 1, i])) # plot normalised curve
plt.show()实验结果:


2 卷积神经网络实现负荷识别
#总功率问题
# - 存在非常大的峰
# - 其他电器出现的一些周期性峰值,这些在每个房子中的频率不同
# - 噪声/峰值平坦度与一号房的电视电源使用量相似
# 想法
# - 针对开/关状态测试/检查
# - 测试/检查状态改变的时间
# - 使用 CNN/ANN/MLP 进行分类
# - 输入应该是整个时间,还是 N 个时间步长的夹头
# - N 时间步长意味着它可以推广到不同的测试输入大小
# - 然后应该建立这 N 个大输入的训练集和测试集,每一张幻灯片
# - 允许在输入中打开状态的位置进行概括
# - 输出N 1/0(或开启的概率,然后是阈值)每个时间步对应的输出import numpy as np
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.models import Sequential
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
def sep_data(houses, T=0.8):
    # seperate the house data into windowed sections winin each house
    # split into training (T%) and test set, no validation set is being used
    # record the class as a one hot vector of if the last state is on/off

    window_len = 20
    #at size 20 the classifier was classfiing the periodic sections as the tv being on in house one #filter sized 6,3 wights 1.8,1
    #at length 30 a similar thing happened, filters 6,3 weights at 3:1, not as accurate as ^
    # at 10, with 3,3, stride and 2.2:1 similar problems

    ntrain = int((n-window_len)*T) # amount of data from each house to train on
    train = np.zeros((ntrain*h,window_len,1,2))                         # [#windows x samples per window x 1(as only one input feature) x 2 (for rata, and weights)
    test = np.zeros( ( (n-window_len-ntrain)*h,window_len,1,2))
    for j in range(h): #each house
      for i in range(n-window_len):
            train_index = i+j*ntrain ## which row in the reformatted array is being filled
            test_index = i-ntrain + j*(n-window_len-ntrain)

            if i<ntrain:#part of training set
                train =np.reshape(houses , (window_len,1))   # window up the aggregated power
                #train = np.reshape(houses, (window_len,1)) # no longer used, was used when every state in window was a class
                train = houses                            # identify state of last step in window
                train = -(train -1)                     # one hot encode the category,cat 0 = on, cat 1 = off
            else: # test set
                test =np.reshape( houses, (window_len,1))
                #test = np.reshape(houses, (window_len,1))
                test = houses
                test = -(test - 1)
    #return train[:,:,:,0],train[:,:,0,1],test[:,:,:,0],test[:,:,0,1]

    # how uneven is the data? this could be a problem when training, as a large group can overpower and always be predicted
    wratio = np.sum(train[:,1,:,1])/np.sum(train[:,0,:,1])
    print(wratio)
    return train[:,:,:,0],train[:,0:2,0,1],test[:,:,:,0],test[:,0:2,0,1]




# 训练与评估CNN模型
def run_model(trainx, trainy, testx, testy):
    verbose, epochs, batch_size = 0, 10, 64
    n_timesteps, n_features, n_outputs = trainx.shape, trainx.shape, trainy.shape
    # 使用 keras 构建卷积神经网络
    #卷积应该有希望识别窗口时间序列数据中的特征,以指示最终时间状态
    model = Sequential()
    model.add(Conv1D(filters=120, kernel_size=6, activation='sigmoid', input_shape=(n_timesteps, n_features)))
    model.add(Conv1D(filters=120, kernel_size=3, activation='sigmoid'))
    model.add(Dropout(0.5)) #这一层有助于规范化,并希望减少对训练集的过度拟合
    model.add(MaxPooling1D(pool_size=2))
    model.add(Flatten()) # 展平
    model.add(Dense(100, activation='sigmoid')) #全连接层
    model.add(Dense(n_outputs, activation='softmax'))# 在这里使用 softmax 来预测每个类中的概率

    #当每个时间步被预测为开或关时使用,sigmoid函数可以作为多个类,并且需要不同的损失度量

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    # 编译模型
    model.summary()
    model.fit(trainx, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose, class_weight = {0:2.2,1:1 }) # 实施类权重以修复类不平衡,并停止对所有状态进行预测

    # 评估模型
    pr = model.predict(testx)
    pt = model.predict(trainx)

    plt.figure()
    ax1 = plt.subplot(211)
    ax1.title.set_text('Training set')
    ax1.plot(np.round(pt[:,0]), label = 'Predicted class')
    ax1.plot(trainy[:,0], label ='class (1=on) ' )
    ax1.plot(pt[:, 0], label='Class probability')


    ax2 = plt.subplot(212)
    ax2.title.set_text('Test set')
    ax2.plot(np.round(pr[:, 0]), label = 'Predicted class')
    ax2.plot(testy[:, 0],label ='class (1=on) ' )
    ax2.plot(pr[:, 0],label='Class probability')
    ax2.legend(loc='upper center', bbox_to_anchor=(0.5, -0.05), fancybox=True, shadow=True, ncol=3)

    plt.show()

    loss1, accuracy = model.evaluate(testx, testy, batch_size=batch_size, verbose=0)
    loss2, at = model.evaluate(trainx, trainy, batch_size=batch_size, verbose=0)
    print('Training accuracy: %.3f ' % at)
    print('Training loss: %.3f ' % loss2)
    print('Test accuracy: %.3f' % accuracy)
    print('Test loss: %.3f' % loss1)
    return accuracy,model


#模型评估
def model_variability(scores):
    print(scores)
    m, s = np.mean(scores), np.std(scores)
    print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))


#将数据重新格式化为训练集和测试集。
# 数据正在加窗,最后一次的开/关状态记录在一个热向量中
trainx, trainy, testx, testy = sep_data(houses)

repeats = 3
scores = list()
for r in range(repeats):
    score,model = run_model(trainx, trainy, testx, testy)
    score *=100.0
    print('>#%d: %.3f' % (r + 1, score))
    scores.append(score)


model_variability(scores)



数据:下载
页: [1]
查看完整版本: 【数据挖掘】基于卷积神经网络的非侵入式负荷分解(NILM)Python实现