硬件介绍:
本次项目使用的是M5Stack UnitV2 - AI摄像头 (SSD202D)。它是M5Stack推出的一款高效率的AI识别模块,采用Sigmstar SSD202D(集成双核Cortex-A7 1.2Ghz处理器)控制核心,集成128MB-DDR3内存,512MB NAND Flash, 1080P摄像头。还有麦克风、TF卡插槽、UART端口、USB-C、WiFi 2.4GHz等外设。在次硬件上跑着一个linux操作系统,支持Python。本次任务使用Python来实现。
原理框图如下:
任务介绍:
任务4:识别办公室/宿舍出入人员是否有正常佩戴口罩,发现异常立即报警,并将每日的数据上传到PC端,生成报表。
任务流程如下:AI摄像头端,通过收集数据,训练模型,使用模型判断人员是否戴口罩,将是否戴口罩的信息上传至PC同时如果未佩戴口罩则亮起红灯警示。
流程图如下:
方案思路:
1.拍摄照片用作生成模型的基础数据,模型生成:可以使用 V-Training (m5stack.com) 提供的在线模型生成,也可以离线生成。
这里拍摄照片是按照老师直播中那样一个一个点那个拍摄再保存的按钮,挺麻烦的,后来意识到其实可以用它内部的micropython识别到人后直接拍照再存到SD卡中,会方便快捷的多。
若损失曲线显示一张下降的曲线,则说明模型训练成功。
2.生成后的压缩包可以直接上传到 UnitV2 中进行使用。
3.使用模型来识别口罩佩戴情况,根据佩戴情况决定是否亮灯警示(录屏所以放了图片),将情况反馈至PC端(使用python爬虫获取json格式的数据),PC端根据反馈的数据生成相应的弹窗提示,并将数据记录至txt文件中。
戴口罩时:
没有戴口罩时:
设备端:
from json.decoder import JSONDecodeError
import subprocess
import json
import time
import sys
import base64
import requests
def control_white_led(value):
open('/sys/class/gpio/export', 'w').write('0') # Export the GPIO0
open('/sys/class/gpio/gpio0/direction', 'w').write('out') # Set the direction of the GPIO
open('/sys/class/gpio/gpio0/value', 'w').write(str(value)) # Set the calute, '1' or '0'
def control_red_led(value):
open('/sys/class/gpio/export', 'w').write('1') # Export the GPIO0
open('/sys/class/gpio/gpio1/direction', 'w').write('out') # Set the direction of the GPIO
open('/sys/class/gpio/gpio1/value', 'w').write(str(value)) # Set the calute, '1' or '0'
reconizer = subprocess.Popen(
['/home/m5stack/payload/bin/object_recognition', '/home/m5stack/payload/uploads/models/v2model_0b5431c82a9b1d32'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
reconizer.stdin.write("_{\"stream\":0}\r\n".encode('utf-8')) # 不需要获得图片
reconizer.stdin.flush()
def ismask(objdict):
try:
if objdict.get('num') == 2:
obj0 = objdict["obj"][0]
obj1 = objdict["obj"][1]
if obj0["type"] == "person":
res = obj1
else:
res = obj0
for objnode in res.get('obj'):
if objnode.get('prob') and objnode.get('type') :
prob = objnode.get('prob')
if objnode.get('type') == 'Mask' and prob > 0.6:
control_white_led(0)
control_red_led(1)
else:
control_white_led(1)
control_red_led(0)
except json.decoder.JSONDecodeError:
print("数据异常!")
while (True):
objdict = json.loads(reconizer.stdout.readline().decode('utf-8'))
#print(objdict)
if objdict.get('num') and objdict.get('obj') and objdict["running"] == "Object Recognition":
ismask(objdict)
桌面的提醒:
无人时:
未佩戴口罩时:
佩戴口罩时:
txt文本记录:
PC端:
import ctypes
import json
import threading
import time
import urllib.request
import requests
from tkinter import *
ctypes.windll.shcore.SetProcessDpiAwareness(1)
remote = "http://10.254.239.1"
lastTime = 0
status = "NoBody"
count = 0
stoptime = 3000
width = 380
height = 300
screenwidth = 1920
screenheight = 1080
alignstr = '%dx%d+%d+%d' % (width, height, (screenwidth - width) / 2, (screenheight - height) / 2)
def writeDict2File(fileName,dict_data):
js = json.dumps(dict_data)
file = open(fileName, 'a', encoding='utf-8')
file.write('\n'+js)
file.close()
def analyze2(s):
global lastTime, status, count
try: # 处理解析异常的问题
dict_data = json.loads(s) # 字符串转换成字典
print(dict_data)
if dict_data["running"] == "Object Recognition" :
if dict_data["num"] == 1 and dict_data["obj"][0]["type"] == "person":
status = "NoMask"
dict_target = {"date": "", "type": ""}
date = time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())
dict_target["date"] = date
dict_target["type"] = "NoMask"
writeDict2File('daily_mask_wearing_data.txt', dict_target)
print("NoMask_1")
lastTime = time.time()
elif dict_data["num"] == 1 and dict_data["obj"][0]["type"] != "person":
status = "NoBody"
dict_target = {"date": "", "type": ""}
date = time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())
dict_target["date"] = date
dict_target["type"] = "NoBody"
writeDict2File('daily_mask_wearing_data.txt', dict_target)
print("NoBody")
lastTime = time.time()
elif dict_data["num"] == 2 :
obj0 = dict_data["obj"][0]
obj1 = dict_data["obj"][1]
if obj0["type"] == "person":
res = obj1
else:
res = obj0
if res["prob"] > 0.6: # 概率值为60%以上才认为识别到口罩
status = "Mask"
dict_target = {"date": "", "type": ""}
date = time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())
dict_target["date"] = date
if res["type"] == "NoMask":
status = "NoMask"
print("NoMask")
dict_target["type"] = "NoMask"
print("NoMask_2")
writeDict2File('daily_mask_wearing_data.txt', dict_target)
lastTime = time.time()
elif res["type"] == "Mask":
print("Mask")
status = "Mask"
dict_target["type"] = "Mask"
writeDict2File('daily_mask_wearing_data.txt', dict_target)
print("Mask")
lastTime = time.time()
except json.decoder.JSONDecodeError:
print("数据异常!")
def timecalc():
global status, count
while True:
if (time.time() - lastTime > 2):
if status != "NoBody":
status = "NoBody"
count = 0
# 流读取
def stream(url):
bytes = b''
with urllib.request.urlopen(url, data=b'') as f:
while True:
bytes += f.read(100)
str = bytes.decode('utf-8')
strobj = str.split('|')
if len(strobj) > 1:
for i in range(len(strobj)-1):
analyze2(strobj[i])
try:
json.loads(strobj[-1])
analyze2(strobj[-1])
bytes = b''
except:
bytes = strobj[-1].encode('utf-8')
def res(url, data=None, headers=None, json=True):
res = requests.post(url, json=data, headers=headers)
return res.json() if json else res.text
def switchFunction():
return res(remote + "/func", {"type_id": "1", "type_name": "object_recognition", "args": ["v2model_0b5431c82a9b1d32"]})
def getResult():
return stream(remote + "/func/result")
def tixing():
global lastTime, status, count
myWindow = Tk()
myWindow.title('当前口罩佩戴状态')
myWindow.geometry(alignstr)
myWindow.resizable(width=False, height=True)
if status == "NoBody" :
neirong = "暂未识别到人"
elif status == "NoMask" :
neirong = "请尽快佩戴口罩"
else:
neirong = "Good Job!"
Message(myWindow, text=neirong, padx=20, pady=20).pack()
myWindow.after(stoptime, myWindow.destroy)
myWindow.mainloop()
if __name__ == '__main__':
switchFunction()
print("Ready")
while(True):
# 定时器
th1 = threading.Thread(target=timecalc)
th1.start()
# 数据获取
th2 = threading.Thread(target=getResult)
th2.start()
th3 = threading.Thread(target=tixing)
th3.start()
项目心得:
(1)对于json数据格式有了更为明确地掌握
(2)学习了python爬虫获取网页的json数据
(3)在AI模型的训练上还要选取更多更为优质的训练集;
(4)应该适当选取阈值,不然结果会很不理想
困难:
(1)设备与PC的通信
(2)如何免密ssh远程登录
(3)如何将数据上传至云端
(4)提高识别的准确度
未来展望:
(1)更智能化,让拍照到保存图片至SD卡再到上传至V-training平台实现一体化
(2)多多学习ssh远程登录
(3)学习如何利用python实现更为人性化的提醒措施
(4)学习如何得到AI模型