M5Stack UnitV2 - 使用AI摄像头 (SSD202D)检查你是否戴口罩
M5Stack UnitV2 AI 摄像头 (SSD202D) 机器学习 数据收集 训练 mqtt linux python 口罩
标签
嵌入式系统
测试
aramy
更新2021-07-12
1279

    硬件介绍
    本次项目使用的是M5Stack UnitV2 - AI摄像头 (SSD202D)。这是一个使用Sigmstar SSD202D做主控芯片,带Neno和FPU的ARM,双核 Cortex-A7 1.2Ghz 处理器,嵌入式128MB DDR3,512MB NAND Flash;使用GC2145 1080P Colored Sensor摄像头,还有麦克风、TF卡插槽、UART端口、USB-C、WiFi 2.4GHz等外设。在次硬件上跑着一个linux操作系统,支持Python。本次任务使用Python来实现。FnDuUdR_vNFGOp5uBdIQVxf0yEi2

    任务介绍:
    按活动要求,我选择了任务4,识别办公室/宿舍出入人员是否有正常佩戴口罩,发现异常立即报警,并将每日的数据上传到PC端,生成报表。分解该任务如图:FnLViylZq2EMB2tOq-WDd7lPYrnu总体逻辑分为两块。一块为AI摄像头端,通过收集数据,训练模型,使用模型判断是否带口罩了,将是否戴口罩的信息上送物联网。另一块通过物联网订阅信息,统计展示信息。

    任务实现:
    1、数据收集。作为机器学习,那么第一步就是收集足够多的数据用了做训练数据。判断是否带口罩,就需要分别收集戴口罩和不带口罩的图片。AI摄像头提供了web页面,可以通过浏览器查看摄像头当前的图像,并且可以手工点击抓取图片。但是这样采集太麻烦了。通过直播,了解到AI摄像头出厂已经带了训练好了的物体识别的模型。这里借助已有的模型,来采集数据。当摄像头识别到人(person),切准确率超过0.95,时间间隔超过0.2秒,就保存图片。

# @File    : AICAM    catchpic.py
# 使用 厂家提供的程序,识别人像,发现人像就保存起来,做为以后学习用的源
from json.decoder import JSONDecodeError
import subprocess
import json
import base64
import time

#从json中读取到图片信息
def readimagefromjson(objdict,image):
    #读取到的第一个json
    if objdict.get('num') and objdict.get('obj') and objdict.get('num') == 1:
        for objnode in objdict.get('obj'):
            if objnode.get('prob'):
                image['prob'] = objnode.get('prob')
            if objnode.get('type'):
                image['type'] = objnode.get('type')
    #读取到的第三个json
    if objdict.get('img'):
        image['img'] = objdict.get('img')

reconizer = subprocess.Popen(
    ['/home/m5stack/payload/bin/object_recognition', '/home/m5stack/payload/uploads/models/yolo_20class'],
    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

reconizer.stdin.write("_{\"stream\":1}\r\n".encode('utf-8'))
reconizer.stdin.flush()

i=0
begt = time.time()
while i<100:
    image={}
    objdict = json.loads(reconizer.stdout.readline().decode('utf-8'))
    if objdict.get('num'):      #第一个json
        readimagefromjson(objdict,image)
        objdict = json.loads(reconizer.stdout.readline().decode('utf-8'))       #第二个
        readimagefromjson(objdict, image)
        objdict = json.loads(reconizer.stdout.readline().decode('utf-8'))       #第三个
        readimagefromjson(objdict, image)
        picbytes = base64.b64decode(image.get('img'))
        image.pop('img')
        offtime=time.time()-begt
        if image.get('prob') and image.get('prob')>0.95 and offtime>0.2:
            begt = time.time()
            print(i,image)
            filename='./res/'+str(i)+'.json'
            imagename='./res/'+str(i)+'.png'
            with open(filename,'w') as file_obj:
                json.dump(image,file_obj)
            with open(imagename, 'wb') as fw:
                fw.write(picbytes)
            i+=1

 用这段代码,在AI摄像头上运行,就可以收集到图片啦!FpRspPhTqXDVPCOvK4CFdSjGDBmS收集的图片都是320X240的,收集的数量是100张;并且都保存到了sd卡中了,可以直接拷贝。为了尊重小伙伴们,最后训练的模特都是自己的啦!
    2、训练,机器学习中最难理解的就是AI模型的建立和训练。但是在这次的活动中再也不用为这事担心啦!M5Stack有提供在线的训练模型和工具V-Training在线训练平台,将收集到的图片上传训练平台,然后一张一张图片打标签,由平台进行训练,最后得到一个模型文件。上传到AI摄像头即可。
    3、模型的使用。有了训练好的模型,上传到AI摄像头运行后,每当摄像头中出现戴口罩或者不带口罩的人脸时就能得到这样一个json字符串

{"num": 1, "obj": [{"prob": 0.97629565, "x": 229, "y": 69, "w": 171, "h": 203, "type": "mask"}], "running": "Object Recognition"}

为了简化模型,并没有上传图片信息,毕竟图片信息还是挺大的。物联网我使用的是百度的IOT2,没有使用教程中老师给的方法。FjEogSI5it2t-G3FNJTACjru0j8y
使用了自己申请的物联网服务,在AI摄像头识别到信息后,将json字符串上送给物联网。这里设定了一个阈值,0.8当识别成功率超过0.8时才上送处理。另外每次上送时间间隔控制为大于1秒。Fo4-7pjYWr8-N9aZ00SoBv8WsTns

# @File    : AICAM    judgemask.py
# 使用 训练的模型 判断是否带了口罩
from json.decoder import JSONDecodeError
import subprocess
import json
import time
import paho.mqtt.client as mqtt
from imgdeal.sendwarning import warnlev
from unit.mqttinfo import MQTT_INFO

reconizer = subprocess.Popen(
    ['/home/m5stack/payload/bin/object_recognition', '/home/m5stack/payload/uploads/models/v2model_845dfafaac401530'],
    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

reconizer.stdin.write("_{\"stream\":0}\r\n".encode('utf-8'))   #不需要获得图片
reconizer.stdin.flush()

def ismask(objdict):
    prob=0
    if objdict.get('num') and objdict.get('obj') and objdict.get('num') == 1:
        for objnode in objdict.get('obj'):
            if objnode.get('type'):
                if objnode.get('type')=='mask':      #戴口罩了
                    warnlev(0)
                else:           #没戴口罩
                    warnlev(1)
            if objnode.get('prob'):
                prob=objnode.get('prob')
    return prob

def on_publish(msg, rc):  # 成功发布消息的操作
    if rc == 0:
        print("publish success, msg = " + msg)


def on_connect(client, userdata, flags, rc):  # 连接后的操作 0为成功
    print("Connection returned " + str(rc))

selfmqtt=MQTT_INFO()

if __name__== "__main__":
    begt = time.time()
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id)  # ClientId不能重复,所以使用当前时间
    client.username_pw_set(selfmqtt.mqtt_name, selfmqtt.mqtt_pwd)  # 设置用户名,密码
    client.connect(selfmqtt.broker_address, selfmqtt.port, 60)  # 连接服务 keepalive=60
    client.on_connect = on_connect  # 连接后的操作
    client.loop_start()
    time.sleep(2)

    while True:
        objdict = json.loads(reconizer.stdout.readline().decode('utf-8'))
        if objdict.get('num') and objdict.get('obj') and objdict.get('num') == 1 and time.time()-begt>1.0:
            begt = time.time()
            prob=ismask(objdict)
            if prob>0.8:
                print(objdict)
                rc, mid = client.publish(selfmqtt.publishtopic[0], payload=json.dumps(objdict), qos=1)  # qos
                on_publish(str(objdict), rc)

额外的,AI摄像头与电脑连接是使用数据线,供电的同时,还作为一个网卡可以与linux通过ssh通讯。但是wifi还不清楚怎么配置。这里写了个简单的shell脚本,在开机后,用root运行一次,就可以联上WiFi了。物联网是需要通过互联网上传的。

wpa_cli -i wlan0 << EOF
set_network 0 ssid "******"
set_network 0 key_mgmt WPA-PSK
set_network 0 psk "******"
enable_network 0
select_network 0
EOF

    4、报警提示。现在出行,到处都是测体温,提示戴口罩的。这里当发现没有戴口罩的简单地使用LED来做提示。当发现没有戴口罩,就亮红色LED灯。带了口罩就灭灯。这个LED灯基本被封在壳子里了,不是太明显,可以考虑使用摄像头上的串口,外接设备,但是那个接口貌似挺贵的,暂时就先用LED灯来提示啦!

    5、PC端,使用python+qt。启用一个线程,订阅物联网消息

# @File    : mqtt_subprocess.py
#用来接收mqtt的线程
import threading

from PyQt5.QtCore import pyqtSignal, QThread, QMutex
import paho.mqtt.client as mqtt

from unit.mqttinfo import MQTT_INFO

class GetMqttMsg(QThread):
    sinGetNewMsg = pyqtSignal(str)        #定义信号
    def __init__(self):
        super(GetMqttMsg, self).__init__()
        self.mqttinfo=MQTT_INFO()
        self.client=mqtt.Client()


    def on_connect(self,client, userdata, flags, rc):
        print("Connected with result code: " + str(rc))

    def on_message(self,client, userdata, msg):
        # print(msg.topic + " " + str(msg.payload))
        self.sinGetNewMsg.emit(str(msg.payload,encoding = "utf-8"))

    def run(self):
        self.client.username_pw_set(self.mqttinfo.mqtt_name, self.mqttinfo.mqtt_pwd)  # 设置用户名,密码
        self.client.connect(self.mqttinfo.broker_address, self.mqttinfo.port, 60)  # 连接服务 keepalive=60
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.subscribe(self.mqttinfo.publishtopic[0], qos=0)
        self.client.loop_forever()  # 保持连接

使用QT做主界面,实时展示当前戴口罩和不带口罩的人数。用饼图方式展示戴口罩和不戴口罩的比例关系。并且将数据记录到文件中(这里为了方便,使用简单的文本文件做数据记录,实际使用中可以考虑入库)
FuGWMhuGe49tzOx01jQhtng0bWARFly-W_Cp8sIZaijJii88ZCci7683FgJVZbqsd1X-z9sAIUnX8nOvhApj

    活动感想:非常感谢电子森林举办的这次活动,能够有机会接触到这样的智能摄像头。关于摄像头的几个想法:1 镜头太小,如果能让出更大空间 换大光圈长焦镜头就会方便很多,做人员出入时,不方便固定到门口,距离远了人就变得很小。2 温度还是偏高,真的很热;但是提供的模型非常好用!非常感谢!

 

 

团队介绍
电子爱好者
团队成员
aramy
单片机业余爱好者,瞎捣鼓小能手。
评论
0 / 100
查看更多
目录
硬禾服务号
关注最新动态
0512-67862536
info@eetree.cn
江苏省苏州市苏州工业园区新平街388号腾飞创新园A2幢815室
苏州硬禾信息科技有限公司
Copyright © 2024 苏州硬禾信息科技有限公司 All Rights Reserved 苏ICP备19040198号