内容介绍
内容介绍
硬件介绍
UnitV2是M5Stack推出的一款高效率的AI识别模块,专为, 采用Sigmstar SSD202D(集成双核Cortex-A7 1.2Ghz处理器)控制核心,集成128MB-DDR3内存,512MB NAND Flash, 1080P摄像头。内嵌Linux操作系统,集成丰富的软硬件资源与开发工具,致力带给用户开箱即用,简洁高效的Ai开发体验。
设计思路
使用 Yolo V3 来识别口罩佩戴情况,将情况反馈至 IoT 平台,利用 IoT 平台自带的数据展示模块进行数据展示。
设计流程 模型生成
可以使用 V-Training (m5stack.com) 提供的在线模型生成,也可以离线生成。
生成后的压缩包可以直接上传到 UnitV2 中进行使用。
数据读取及上传
import requests
import json
import time
import urllib.request
import threading
from mttkinter import mtTkinter as tk
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(1)
# 读取配置
with open("config.json", 'r') as f:
config = json.load(f)
deviceId = config["deviceId"]
APIKey = config["APIKey"]
remote = "http://10.254.239.1"
lastTime = 0
status = "NoBody"
count = 0
class comp:
def __init__(self):
self.root = tk.Tk()
self.label = tk.Label(self.root, text="未检测到人脸", bg="yellow", bd=10, fg="black",
font=("微软雅黑", 40), width=1200, height=1200)
def initComponent(self):
self.root.title("口罩检测")
self.root.geometry("1200x1200")
self.root.resizable(width=True, height=True)
self.label.pack()
self.root.mainloop()
def Mask(self):
self.label["text"] = "您已佩戴口罩"
self.label["bg"] = "green"
self.label["fg"] = "white"
def NoMask(self):
self.label["text"] = "您未佩戴口罩"
self.label["bg"] = "red"
self.label["fg"] = "white"
def NoBody(self):
self.label["text"] = "未检测到人脸"
self.label["bg"] = "yellow"
self.label["fg"] = "black"
myComp = comp()
def analyze(s):
global lastTime, status, count
try:
obj = json.loads(s)
print(obj)
if obj["obj"][0]["type"] == "NoMask":
if status == "NoBody":
count += 1
if count > 5:
status = "NoMask"
uploadData("NoMask")
myComp.NoMask()
lastTime = time.time()
elif obj["obj"][0]["type"] == "Mask":
if status == "NoBody":
count -= 1
if count < -5:
status = "Mask"
uploadData("Mask")
myComp.Mask()
lastTime = time.time()
except Exception as e:
print(e)
def timecalc():
global status, count
while True:
if (time.time() - lastTime > 2):
if status != "NoBody":
status = "NoBody"
count = 0
myComp.NoBody()
# 流读取
def stream(url):
bytes = b''
with urllib.request.urlopen(url, data=b'') as f:
while True:
bytes += f.read(100)
str = bytes.decode('utf-8')
strobj = str.split('|')
if len(strobj) > 1:
for i in range(len(strobj)-1):
analyze(strobj[i])
try:
json.loads(strobj[-1])
analyze(strobj[-1])
bytes = b''
except:
bytes = strobj[-1].encode('utf-8')
def res(url, data=None, headers=None, json=True):
res = requests.post(url, json=data, headers=headers)
return res.json() if json else res.text
def uploadData(data):
return res("http://api.heclouds.com/devices/" + deviceId + '/datapoints',
{"datastreams": [
{"id": data, "datapoints": [{"value": ""}]}]},
{'api-key': APIKey})["errno"]
def switchFunction():
return res(remote + "/func", {"type_id": "1", "type_name": "object_recognition", "args": ["mask"]})
def getResult():
return stream(remote + "/func/result")
def main():
switchFunction()
print("功能切换完成")
# 定时器
th1 = threading.Thread(target=timecalc)
th1.start()
# 数据获取
th2 = threading.Thread(target=getResult)
th2.start()
# 界面显示
myComp.initComponent()
if __name__ == '__main__':
main()
云端大屏展示
使用 OneNet 提供的数据上传服务,将数据上传至云端
利用自带的可视化平台进行数据可视化
遇到的困难
为了降低延时,返回的数据是使用 POST 数据流返回。如果使用普通爬虫会导致程序卡死,换用流读取即可解决。
心得体会
感谢电子森林举办这次活动,能够有机会接触到市面上流行的智能摄像头。通过代码可以看出开发者已经多次改良优化,内置的功能丰富实用。尽管离正式开源还有一段路要走,不过能够感受到这款产品对市场带来的影响。
附件下载
mask.tar
可部署模型
monitor.py
主程序
团队介绍
中国计量大学
团队成员
cjmf
评论
0 / 100
查看更多