M5Stack UnitV2 口罩佩戴检测
M5Stack UnitV2 口罩佩戴检测,使用Yolo模型对口罩进行检测。
标签
嵌入式系统
cjmf
更新2021-07-13
1182

硬件介绍

UnitV2是M5Stack推出的一款高效率的AI识别模块,专为, 采用Sigmstar SSD202D(集成双核Cortex-A7 1.2Ghz处理器)控制核心,集成128MB-DDR3内存,512MB NAND Flash, 1080P摄像头。内嵌Linux操作系统,集成丰富的软硬件资源与开发工具,致力带给用户开箱即用,简洁高效的Ai开发体验。

设计思路

使用 Yolo V3 来识别口罩佩戴情况,将情况反馈至 IoT 平台,利用 IoT 平台自带的数据展示模块进行数据展示。

设计流程 模型生成

可以使用 V-Training (m5stack.com) 提供的在线模型生成,也可以离线生成。

Fibkt1ebbe4J_AwzSJl3Mvc9fnZH

生成后的压缩包可以直接上传到 UnitV2 中进行使用。

FhCkrgydBwdEn30DOa0mGBjh8JPN

数据读取及上传

import requests
import json
import time
import urllib.request
import threading
from mttkinter import mtTkinter as tk
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(1)

# 读取配置
with open("config.json", 'r') as f:
    config = json.load(f)
    deviceId = config["deviceId"]
    APIKey = config["APIKey"]
remote = "http://10.254.239.1"
lastTime = 0

status = "NoBody"
count = 0


class comp:

    def __init__(self):
        self.root = tk.Tk()
        self.label = tk.Label(self.root, text="未检测到人脸", bg="yellow", bd=10, fg="black",
                              font=("微软雅黑", 40), width=1200, height=1200)

    def initComponent(self):
        self.root.title("口罩检测")
        self.root.geometry("1200x1200")
        self.root.resizable(width=True, height=True)
        self.label.pack()
        self.root.mainloop()

    def Mask(self):
        self.label["text"] = "您已佩戴口罩"
        self.label["bg"] = "green"
        self.label["fg"] = "white"

    def NoMask(self):
        self.label["text"] = "您未佩戴口罩"
        self.label["bg"] = "red"
        self.label["fg"] = "white"

    def NoBody(self):
        self.label["text"] = "未检测到人脸"
        self.label["bg"] = "yellow"
        self.label["fg"] = "black"


myComp = comp()


def analyze(s):
    global lastTime, status, count
    try:
        obj = json.loads(s)
        print(obj)

        if obj["obj"][0]["type"] == "NoMask":
            if status == "NoBody":
                count += 1
                if count > 5:
                    status = "NoMask"
                    uploadData("NoMask")
            myComp.NoMask()
            lastTime = time.time()
        elif obj["obj"][0]["type"] == "Mask":
            if status == "NoBody":
                count -= 1
                if count < -5:
                    status = "Mask"
                    uploadData("Mask")
            myComp.Mask()
            lastTime = time.time()
    except Exception as e:
        print(e)


def timecalc():
    global status, count
    while True:
        if (time.time() - lastTime > 2):
            if status != "NoBody":
                status = "NoBody"
                count = 0
                myComp.NoBody()


# 流读取
def stream(url):
    bytes = b''

    with urllib.request.urlopen(url, data=b'') as f:
        while True:
            bytes += f.read(100)
            str = bytes.decode('utf-8')
            strobj = str.split('|')
            if len(strobj) > 1:
                for i in range(len(strobj)-1):
                    analyze(strobj[i])
            try:
                json.loads(strobj[-1])
                analyze(strobj[-1])
                bytes = b''
            except:
                bytes = strobj[-1].encode('utf-8')


def res(url, data=None, headers=None, json=True):
    res = requests.post(url, json=data, headers=headers)
    return res.json() if json else res.text


def uploadData(data):
    return res("http://api.heclouds.com/devices/" + deviceId + '/datapoints',
               {"datastreams": [
                   {"id": data, "datapoints": [{"value": ""}]}]},
               {'api-key': APIKey})["errno"]


def switchFunction():
    return res(remote + "/func", {"type_id": "1", "type_name": "object_recognition", "args": ["mask"]})


def getResult():
    return stream(remote + "/func/result")


def main():
    switchFunction()
    print("功能切换完成")
    # 定时器
    th1 = threading.Thread(target=timecalc)
    th1.start()

    # 数据获取
    th2 = threading.Thread(target=getResult)
    th2.start()

    # 界面显示
    myComp.initComponent()


if __name__ == '__main__':
    main()

FkrsDkV6RtbUR3OSzaF8jHbSJ-mv

Ft5TyVG32E4sEHnf9FTgtTtq8HPK

FnCof6jk1pI21mpx_dFQ6rUx6vac

云端大屏展示

使用 OneNet 提供的数据上传服务,将数据上传至云端

Fk-1QLLnCN_x9lefuCE7IUkih9wQ

利用自带的可视化平台进行数据可视化

FqRb6DZQcRxdFk8Mdu83m3XYrrYh

FgkRwFCYHUWL_Ds3F8uV2R2a4k70

大屏分享

遇到的困难

为了降低延时,返回的数据是使用 POST 数据流返回。如果使用普通爬虫会导致程序卡死,换用流读取即可解决。

心得体会

感谢电子森林举办这次活动,能够有机会接触到市面上流行的智能摄像头。通过代码可以看出开发者已经多次改良优化,内置的功能丰富实用。尽管离正式开源还有一段路要走,不过能够感受到这款产品对市场带来的影响。

附件下载
mask.tar
可部署模型
monitor.py
主程序
团队介绍
中国计量大学
团队成员
cjmf
评论
0 / 100
查看更多
目录
硬禾服务号
关注最新动态
0512-67862536
info@eetree.cn
江苏省苏州市苏州工业园区新平街388号腾飞创新园A2幢815室
苏州硬禾信息科技有限公司
Copyright © 2024 苏州硬禾信息科技有限公司 All Rights Reserved 苏ICP备19040198号