我們是通訊行業動力維護專業,因為外市電(三相交流電)的波動會對我們的設備造成一定的影響。于是想做一個交流電監測的應用,監測交流電的過壓或欠壓,所以我做了這么一個實時監測的工具。它可以在過壓和欠壓一定程度的時候告警。或者進行長期監測并記錄電壓數據,分析可能發生過壓或欠壓的原因。
樹莓派作為一個簡單易用功能強大的計算平臺,很好地支持 Python。而且剛好有 MCC 118 數據采集模塊(HAT)可以直接擴展出數據采集功能,MCC 118 提供 8 通道模擬電壓輸入,基于樹莓派的數據采集/數據記錄系統。每張 MCC 118 最大采樣率為 100kS/s,可以進行單電壓點和電壓波形采集。
項目的代碼就是在 MCC DAQ HATs 提供的 SDK 示例代碼中,continuous_scan.py 的基礎上修改來的。實現一個高速的采集及告警設備,來監測外市電的波動情況。
長期監測主要通過長期數據記錄,分析是否有未超出告警范圍的波動,以分析供電質量。
這只是一個工程樣機,可以進行實時(延時3秒+)告警和儲存發生告警時的電壓波形;可以進行長期監測(不實時告警)、也可以長期監測并實時告警。
內部構造如下圖,其中 MCC 118 在圖中正上方。
警告分為4種:本機的光警告、本機的聲警告、繼電器輸出的外部警告、物聯網平臺(OneNet)警告。
目前支持兩路三相交流電監測,每路三相電的 A、B、C 相分別用:黃、綠、紅,發光二極管警告。
物聯網平臺的警告頁面。分別是:一路三相電的一相警告,以及一路三相電的三相警告。
采集到的正常交流電波形如下圖。
數據分析部分是用 NumPy 進行交流電每個周期的最大值、最小值判斷,然后確定是否報警,是否寫入硬盤。
還有一個程序每一做任何判斷,把采集到的所有數據都直接記錄到硬盤,大概一小時會生成 1.2G 的數據。
也是用 Python 寫的腳本,通過 NumPy 把數據做一個轉換 ,然后用 NumPy 的 amax 和 amin 對數值進行分析判斷。如果數據超過閾值,就寫硬盤文件,同時用 GPIO 輸出控制 LED 來發出預警信號。
以下是完整代碼,其中關鍵代碼已經添加了注釋。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MCC 118 Functions Demonstrated:
mcc118.a_in_scan_start
mcc118.a_in_scan_read
mcc118.a_in_scan_stop
mcc118.a_in_scan_cleanup
Purpose:
Perform a continuous acquisition on 1 or more channels.
Description:
Continuously acquires blocks of analog input data for a
user-specified group of channels until the acquisition is
stopped by the user. The last sample of data for each channel
is displayed for each block of data received from the device.
MCC118共8個通道,可以監測兩個三相變壓器的6路市電,分別為變壓器1的A、B、C相和變壓器2的A、B、C相
"""
from __future__ import print_function
from daqhats import mcc118, OptionFlags, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string, \
chan_list_to_mask
import os
import threading
import numpy as np
import datetime
import time
from socket import *
import RPi.GPIO as GPIO
# ===GPIO setup=====================================================================
GPIO.setmode(GPIO.BCM)
led_list=[5,6,19,16,20,21] #LED使用的GPIO
GPIO.setup(led_list, GPIO.OUT, initial=GPIO.LOW)
delta = datetime.timedelta(minutes=5)
# ===================================================================================
READ_ALL_AVAILABLE = -1
CURSOR_BACK_2 = '\x1b[2D's
ERASE_TO_END_OF_LINE = '\x1b[0K'
# ===================================================================================
arraydata=[[0 for i in range(72000)] for h in range(10)] #定義一個采樣緩存數組,72000為read_request_size*信道數,10為#連續采樣的次數c_time
# =======OneNet param===================================================================
device_id = "Your 設備ID" # Your 設備ID
api_key = "Your API_KEY" # Your API_KEY
HOST = 'api.heclouds.com'
PORT = 80
BUFSIZ = 1024
ADDR = (HOST, PORT)
# =======Channel set==================================================================
# Store the channels in a list and convert the list to a channel mask that
# can be passed as a parameter to the MCC 118 functions.
channels = [0, 1, 2, 3, 4, 5]
channel_mask = chan_list_to_mask(channels)
num_channels = len(channels)
samples_per_channel = 0
options = OptionFlags.CONTINUOUS
scan_rate = 10000.0 #采樣速率 最大值為100k/信道數
read_request_size = 12000 #每通道此次采樣的點數
c_time = 10 #連續采樣的次數
timeout = 5.0
# =======上下限=================================================================
volt=220 #根據需要設定標準電壓
thread=0.2 #根據需要設定
upline=volt*(1+thread)*1.414*0.013558 #0.013558是根據電阻分壓確定的
downline=volt*(1-thread)*1.414*0.013558 #0.013558是根據電阻分壓確定的
# =======LED監測=================================================================
base_time=datetime.datetime.now()
#定義時間臨時數組
led_stime=[base_time,base_time,base_time,base_time,base_time,base_time]
#定義狀態臨時數組
led_status=[-1,-1,-1,-1,-1,-1]
#=====================================================================================
def led_detect():
global led_stime,led_status
#變壓器的A、B、C相
transno=["Trans1A","Trans1B","Trans1C","Trans2A","Trans2B","Trans2C"]
try:
#判斷每個LED的狀態
while PORT > 0 :
time.sleep(1)
if GPIO.input(5) == GPIO.HIGH and led_status[0]<0 : ??????
led_stime[0]=datetime.datetime.now()
led_status[0]=1
if GPIO.input(6) == GPIO.HIGH and led_status[1]<0 :
led_stime[1]=datetime.datetime.now()
led_status[1]=1
if GPIO.input(19) == GPIO.HIGH and led_status[2]<0 :
led_stime[2]=datetime.datetime.now()
led_status[2]=1
if GPIO.input(16) == GPIO.HIGH and led_status[3]<0 :
led_stime[3]=datetime.datetime.now()
led_status[3]=1
if GPIO.input(20) == GPIO.HIGH and led_status[4]<0 :
led_stime[4]=datetime.datetime.now()
led_status[4]=1
if GPIO.input(21) == GPIO.HIGH and led_status[5]<0 :
led_stime[5]=datetime.datetime.now()
led_status[5]=1
#相應引腳延時5分鐘熄滅,并向平臺發送數據清除告警
for i in range(6):
now=datetime.datetime.now()
if now > ( led_stime[i] + delta ) and led_status[i]>=0 :
GPIO.output(led_list[i], GPIO.LOW)
led_status[i]=-1
t5 = threading.Thread(target=senddata, args=(transno[i],"0"))
t5.start()
except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')
# ========senddata to OneNet==============================================================
def senddata(T_dev,alm):
Content = ""
Content += "{"datastreams":[{"id": ""+T_dev+"","datapoints": [{"value": " + alm + "}]}"
Content += "]}\r\n"
value = len(Content)
data = ""
data += "POST /devices/" + device_id + "/datapoints HTTP/1.1\r\n"
data += "Host:api.heclouds.com\r\n"
data += "Connection: close\r\n"
data += "api-key:" + api_key + "\r\n"
data += "Content-Length:" + str(value) + "\r\n"
data += "\r\n"
data += Content
tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR)
tcpCliSock.send(data.encode())
#data1 = tcpCliSock.recv(BUFSIZ).decode()
#print(data1)
# =====writefile==========================================================================
def writefile(arrayatt, fnat):
print("write file ", datetime.datetime.now())
np.savetxt(fnat, arrayatt.T, fmt="%1.2f", delimiter=" ")
print("\033[0;31m","finish write ", datetime.datetime.now(),"\033[0m")
#====Analyse and writefile and Alarm=======================================================
def analyse(array,fnat):
break_1_1 = -1
break_2_1 = -1
break_1_2 = -1
break_2_2 = -1
break_1_3 = -1
break_2_3 = -1
break_1 = -1
break_2 = -1
print("analyse file ", datetime.datetime.now())
#等于read_request_size = 12000
lll = read_request_size
file1 = "1-"+fnat
file2 = "2-"+fnat
a=np.array(array)
b = np.empty([3, c_time*read_request_size], dtype=float) # 變壓器1定義一個數組
c = np.empty([3, c_time*read_request_size], dtype=float) # 變壓器2定義一個數組
#板子的數據記錄格式是:CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、
#通過下面拆分成6個CH每個CH一個list以便進行分CH判斷
b[0] = np.concatenate((a[0, 0::6], a[1, 0::6], a[2, 0::6], a[3, 0::6], a[4, 0::6], a[5, 0::6], a[6, 0::6],a[7, 0::6], a[8, 0::6], a[9, 0::6])) #提取變壓器1,A相數據
b[1] = np.concatenate((a[0, 1::6], a[1, 1::6], a[2, 1::6], a[3, 1::6], a[4, 1::6], a[5, 1::6], a[6, 1::6],a[7, 1::6], a[8, 1::6], a[9, 1::6])) #提取變壓器1,B相數據
b[2] = np.concatenate((a[0, 2::6], a[1, 2::6], a[2, 2::6], a[3, 2::6], a[4, 2::6], a[5, 2::6], a[6, 2::6],a[7, 2::6], a[8, 2::6], a[9, 2::6])) #提取變壓器1,C相數據
c[0] = np.concatenate((a[0, 3::6], a[1, 3::6], a[2, 3::6], a[3, 3::6], a[4, 3::6], a[5, 3::6], a[6, 3::6],a[7, 3::6], a[8, 3::6], a[9, 3::6])) #提取變壓器2,A相數據
c[1] = np.concatenate((a[0, 4::6], a[1, 4::6], a[2, 4::6], a[3, 4::6], a[4, 4::6], a[5, 4::6], a[6, 4::6],a[7, 4::6], a[8, 4::6], a[9, 4::6])) #提取變壓器2,B相數據
c[2] = np.concatenate((a[0, 5::6], a[1, 5::6], a[2, 5::6], a[3, 5::6], a[4, 5::6], a[5, 5::6], a[6, 5::6],a[7, 5::6], a[8, 5::6], a[9, 5::6])) #提取變壓器2,C相數據
#=====判斷越線=================================================================================
# 200 是 scan_rate = 10000.0 除以 50(交流電的頻率)得出 每個周期 采樣200個點
# 600 是 120000/200=600 read_request_size*c_time本次總的采樣點數,除以每周期200點,總共600個周期
# 如果 scan_rate = 5000.0 就是 100, alens 是 120000/100=1200
for i in range(600):
#每個CH拆分成每個正弦波周期進行峰值判斷
a1 = b[0][(i * 200):((i + 1) * 200 - 1)]
b1 = b[1][(i * 200):((i + 1) * 200 - 1)]
c1 = b[2][(i * 200):((i + 1) * 200 - 1)]
a2 = c[0][(i * 200):((i + 1) * 200 - 1)]
b2 = c[1][(i * 200):((i + 1) * 200 - 1)]
c2 = c[2][(i * 200):((i + 1) * 200 - 1)]
#每一項分別判斷上下限并分別告警
if np.amax(a1) > upline or np.amax(a1) < downline :
if break_1_1 <0:
GPIO.output(5, GPIO.HIGH) #相應引腳置高電平,點亮LED
break_1_1 = 1
t3 = threading.Thread(target=senddata, args=("Trans1A","100")) #調用上傳進程
t3.start()
if np.amax(b1) > upline or np.amax(b1) < downline ?:
if break_1_2< 0:
GPIO.output(6, GPIO.HIGH) #相應引腳置高電平,點亮LED
break_1_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans1B","100")) #調用上傳進程
t3.start()
if np.amax(c1) > upline or np.amax(c1) < downline:
if break_1_3 < 0:
GPIO.output(19, GPIO.HIGH) #相應引腳置高電平,點亮LED
break_1_3 = 1
t3 = threading.Thread(target=senddata, args=("Trans1C","100")) #調用上傳進程
t3.start()
if np.amax(a2) > upline or np.amax(a2) < downline:
if break_2_1 < 0:
GPIO.output(16, GPIO.HIGH) #相應引腳置高電平,點亮LED
break_2_1=1
t3 = threading.Thread(target=senddata, args=("Trans2A","100")) #調用上傳進程
t3.start()
if np.amax(b2) > upline or np.amax(b2) < downline:
if break_2_2 < 0:
GPIO.output(20, GPIO.HIGH) #相應引腳置高電平,點亮LED
break_2_1 =1
t3 = threading.Thread(target=senddata, args=("Trans2B","100")) #調用上傳進程
t3.start()
if np.amax(c2) > upline or np.amax(c2) < downline:
if break_2_3 < 0:
GPIO.output(21, GPIO.HIGH) #相應引腳置高電平,點亮LED
break_2_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans2C","100")) #調用上傳進程
t3.start()
#每個變壓器任何一項有告警就調用寫文件進程寫文件
if np.amax(a1) > upline or np.amax(a1) < downline or np.amax(b1) > upline or np.amax(b1) < downline or np.amax(c1) > upline or np.amax(c1) < downline:
if break_1 < 0:
t1 = threading.Thread(target=writefile, args=(b, file1,)) #調用寫文件進程
t1.start()
break_1 = 2
if np.amax(a2) > upline or np.amax(a2) < downline or np.amax(b2) > upline or np.amax(b2) < downline or np.amax(c2) > upline or np.amax(c2) < downline:
if break_2 < 0:
t1 = threading.Thread(target=writefile, args=(c, file2,)) #調用寫文件進程
t1.start()
break_2 = 2
print("\033[0;32m","analyse finish ", datetime.datetime.now(),"\033[0m") #font color
# ============================================================================================
def main():
"""
This function is executed automatically when the module is run directly.
"""
# ============================================================================================
try:
# Select an MCC 118 HAT device to use.
address = select_hat_device(HatIDs.MCC_118)
hat = mcc118(address)
'''
print('\nSelected MCC 118 HAT device at address', address)
actual_scan_rate = hat.a_in_scan_actual_rate(num_channels, scan_rate)
print('\nMCC 118 continuous scan example')
print(' Functions demonstrated:')
print(' mcc118.a_in_scan_start')
print(' mcc118.a_in_scan_read')
print(' mcc118.a_in_scan_stop')
print(' Channels: ', end='')
print(', '.join([str(chan) for chan in channels]))
print(' Requested scan rate: ', scan_rate)
print(' Actual scan rate: ', actual_scan_rate)
print(' Options: ', enum_mask_to_string(OptionFlags, options))
# ============================================================================================
try:
input('\nPress ENTER to continue ...')
except (NameError, SyntaxError):
pass
'''
# Configure and start the scan.
# Since the continuous option is being used, the samples_per_channel
# parameter is ignored if the value is less than the default internal
# buffer size (10000 * num_channels in this case). If a larger internal
# buffer size is desired, set the value of this parameter accordingly.
print('Starting scan ... Press Ctrl-C to stop\n')
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)
# ============================================================================================
try:
while True:
fna = str(datetime.datetime.now()) + ".txt"
#連續采樣10次
for i in range(c_time):
read_result = hat.a_in_scan_read(read_request_size, timeout) # read channel Data
arraydata[i]=read_result.data
if read_result.hardware_overrun:
print('\n\nHardware overrun\n')
break
elif read_result.buffer_overrun:
print('\n\nBuffer overrun\n')
break
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)
#調用分析進程
arraydatat = arraydata
t2 = threading.Thread(target=analyse, args=(arraydatat, fna, ))
t2.start()
except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
except (HatError, ValueError) as err:
print('\n', err)
if __name__ == '__main__':
#表用進程實時監測告警狀態,判斷是否到時間消除告警
t4 = threading.Thread(target=led_detect)
t4.start()
main()
程序部署
參考這篇教程安裝好 MCC DAQ HATs 的 SDK 和代碼示例。
https://shumeipai.nxez.com/2018/10/18/get-started-with-the-evaluation-for-mcc-118.html
將上面的程序保存為 main.py 然后在程序所在目錄運行下面的命令即可啟動程序。
-
交流電
+關注
關注
14文章
658瀏覽量
34009 -
樹莓派
+關注
關注
116文章
1707瀏覽量
105670
發布評論請先 登錄
相關推薦
評論