|
本方法主要利用基于卷积神经网络的非侵入式负荷分解方法实现住宅设备的识别,输入数据为在设备运行时获得的瞬态功率信号数据。训练卷积神经网络使用数据为开源数据REDD(1Hz),具体实现原理请参考文献下载链接。只供学习参考,Python实现代码如下:
1 第一部分:数据可视化- import pandas as pd
- import matplotlib.pyplot as plt
- import numpy as np
- data = pd.read_csv("data.csv")
- #print(data)
- n = int(data.max(axis = 0, skipna = True)[1]) + 1 # gets the number of readings
- h = int(data.max(axis = 0, skipna = True)[0]) # gets the number of houses
- houses = np.zeros((n,5,h)) # time, tv inst power, aggr power, filtered tv , on/off
- for i in range(h):
- houses[:,0:3,i] = data[data['House']==i+1 ].values[:,1:] # data is now seperated by house
- # visulise data
- plt.figure()
- plt.suptitle('Tv power and agg power seperated ')
- plt.subplot(211)
- for i in range(h):
- plt.plot(houses[:,0,i],houses[:,1,i], label = 'House ' + str(i+1) )
- plt.subplot(212)
- for i in range(h):
- plt.plot(houses[:,0,i],houses[:,2,i], label = 'House ' + str(i+1) )
- plt.legend()
- # plt.show()
- plt.figure()
- plt.suptitle('Normalised data, shown by house')
- for i in range(h):
- plt.subplot(3,1,i+1)
- plt.plot(houses[:,0,i],houses[:,1,i]/np.average(houses[:,1,i]) )
- plt.plot(houses[:, 0, i], houses[:, 2, i]/np.average( houses[:, 2, i]) )
- plt.legend()
- plt.show()
复制代码 实验结果:
过滤并标准化电视即时功率- from scipy.signal import *
- houses[:,3,:] = savgol_filter(houses[:, 1, :] / np.average(houses[:, 1, :],axis=0), 11, 2,axis=0)
- thres = 1.15 # the threshold for seperation
- plt.figure()
- plt.suptitle('Filtered inst power, and determined state')
- for i in range(h):
- houses[ np.argwhere( houses[:,3,i] >= thres ) ,4,i ] +=1 # makes the 4th row qual to 1 if the filtered result is higher then the threshold
- plt.subplot(3, 1, i + 1)
- plt.plot( houses[:, 0, i],houses[:, 3, i] ) # plot filtered curve
- plt.plot(houses[:, 0, i], houses[:, 4, i]) # plot on state
- plt.plot(houses[:, 0, i], houses[:, 1, i] / np.average(houses[:, 1, i])) # plot normalised curve
- plt.show()
复制代码 实验结果:
2 卷积神经网络实现负荷识别- #总功率问题
- # - 存在非常大的峰
- # - 其他电器出现的一些周期性峰值,这些在每个房子中的频率不同
- # - 噪声/峰值平坦度与一号房的电视电源使用量相似
- # 想法
- # - 针对开/关状态测试/检查
- # - 测试/检查状态改变的时间
- # - 使用 CNN/ANN/MLP 进行分类
- # - 输入应该是整个时间,还是 N 个时间步长的夹头
- # - N 时间步长意味着它可以推广到不同的测试输入大小
- # - 然后应该建立这 N 个大输入的训练集和测试集,每一张幻灯片
- # - 允许在输入中打开状态的位置进行概括
- # - 输出N 1/0(或开启的概率,然后是阈值)每个时间步对应的输出
复制代码- import numpy as np
- from keras.layers import Dense
- from keras.layers import Flatten
- from keras.layers import Dropout
- from keras.models import Sequential
- from keras.layers.convolutional import Conv1D
- from keras.layers.convolutional import MaxPooling1D
- def sep_data(houses, T=0.8):
- # seperate the house data into windowed sections winin each house
- # split into training (T%) and test set, no validation set is being used
- # record the class as a one hot vector of if the last state is on/off
- window_len = 20
- #at size 20 the classifier was classfiing the periodic sections as the tv being on in house one #filter sized 6,3 wights 1.8,1
- #at length 30 a similar thing happened, filters 6,3 weights at 3:1, not as accurate as ^
- # at 10, with 3,3, stride and 2.2:1 similar problems
- ntrain = int((n-window_len)*T) # amount of data from each house to train on
- train = np.zeros((ntrain*h,window_len,1,2)) # [#windows x samples per window x 1(as only one input feature) x 2 (for rata, and weights)
- test = np.zeros( ( (n-window_len-ntrain)*h,window_len,1,2) )
- for j in range(h): #each house
- for i in range(n-window_len):
- train_index = i+j*ntrain ## which row in the reformatted array is being filled
- test_index = i-ntrain + j*(n-window_len-ntrain)
- if i<ntrain: #part of training set
- train[train_index,:,:,0 ] =np.reshape(houses[i:i+window_len,2,j] , (window_len,1)) # window up the aggregated power
- #train[train_index, :,:,1] = np.reshape(houses[i:i + window_len,4, j], (window_len,1)) # no longer used, was used when every state in window was a class
- train[train_index, 0 , :, 1] = houses[i + window_len, 4, j] # identify state of last step in window
- train[train_index, 1, :, 1] = -(train[train_index, 0 , :, 1] -1) # one hot encode the category,cat 0 = on, cat 1 = off
- else: # test set
- test[test_index , :,:, 0] =np.reshape( houses[i:i + window_len, 2, j], (window_len,1))
- #test[test_index), :,:, 1] = np.reshape(houses[i:i + window_len, 4, j], (window_len,1))
- test[test_index, 0, :, 1] = houses[i + window_len, 4, j]
- test[test_index, 1, :, 1] = -(test[test_index, 0, :, 1] - 1)
- #return train[:,:,:,0],train[:,:,0,1],test[:,:,:,0],test[:,:,0,1]
- # how uneven is the data? this could be a problem when training, as a large group can overpower and always be predicted
- wratio = np.sum(train[:,1,:,1])/np.sum(train[:,0,:,1])
- print(wratio)
- return train[:,:,:,0],train[:,0:2,0,1],test[:,:,:,0],test[:,0:2,0,1]
- # 训练与评估CNN模型
- def run_model(trainx, trainy, testx, testy):
- verbose, epochs, batch_size = 0, 10, 64
- n_timesteps, n_features, n_outputs = trainx.shape[1], trainx.shape[2], trainy.shape[1]
- # 使用 keras 构建卷积神经网络
- #卷积应该有希望识别窗口时间序列数据中的特征,以指示最终时间状态
- model = Sequential()
- model.add(Conv1D(filters=120, kernel_size=6, activation='sigmoid', input_shape=(n_timesteps, n_features)))
- model.add(Conv1D(filters=120, kernel_size=3, activation='sigmoid'))
- model.add(Dropout(0.5)) #这一层有助于规范化,并希望减少对训练集的过度拟合
- model.add(MaxPooling1D(pool_size=2))
- model.add(Flatten()) # 展平
- model.add(Dense(100, activation='sigmoid')) #全连接层
- model.add(Dense(n_outputs, activation='softmax')) # 在这里使用 softmax 来预测每个类中的概率
- #当每个时间步被预测为开或关时使用,sigmoid函数可以作为多个类,并且需要不同的损失度量
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- # 编译模型
- model.summary()
- model.fit(trainx, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose, class_weight = {0:2.2,1:1 }) # 实施类权重以修复类不平衡,并停止对所有状态进行预测
- # 评估模型
- pr = model.predict(testx)
- pt = model.predict(trainx)
- plt.figure()
- ax1 = plt.subplot(211)
- ax1.title.set_text('Training set')
- ax1.plot(np.round(pt[:,0]), label = 'Predicted class')
- ax1.plot(trainy[:,0], label ='class (1=on) ' )
- ax1.plot(pt[:, 0], label='Class probability')
- ax2 = plt.subplot(212)
- ax2.title.set_text('Test set')
- ax2.plot(np.round(pr[:, 0]), label = 'Predicted class')
- ax2.plot(testy[:, 0],label ='class (1=on) ' )
- ax2.plot(pr[:, 0],label='Class probability')
- ax2.legend(loc='upper center', bbox_to_anchor=(0.5, -0.05), fancybox=True, shadow=True, ncol=3)
- plt.show()
- loss1, accuracy = model.evaluate(testx, testy, batch_size=batch_size, verbose=0)
- loss2, at = model.evaluate(trainx, trainy, batch_size=batch_size, verbose=0)
- print('Training accuracy: %.3f ' % at)
- print('Training loss: %.3f ' % loss2)
- print('Test accuracy: %.3f' % accuracy)
- print('Test loss: %.3f' % loss1)
- return accuracy,model
- #模型评估
- def model_variability(scores):
- print(scores)
- m, s = np.mean(scores), np.std(scores)
- print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))
- #将数据重新格式化为训练集和测试集。
- # 数据正在加窗,最后一次的开/关状态记录在一个热向量中
- trainx, trainy, testx, testy = sep_data(houses)
- repeats = 3
- scores = list()
- for r in range(repeats):
- score,model = run_model(trainx, trainy, testx, testy)
- score *=100.0
- print('>#%d: %.3f' % (r + 1, score))
- scores.append(score)
- model_variability(scores)
复制代码
数据:下载 |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
×
|