内容介绍
内容介绍
硬件介绍:
本次项目使用的是M5Stack UnitV2 - AI摄像头 (SSD202D)。这是一个使用Sigmstar SSD202D做主控芯片,带Neno和FPU的ARM,双核 Cortex-A7 1.2Ghz 处理器,嵌入式128MB DDR3,512MB NAND Flash;使用GC2145 1080P Colored Sensor摄像头,还有麦克风、TF卡插槽、UART端口、USB-C、WiFi 2.4GHz等外设。在次硬件上跑着一个linux操作系统,支持Python。
规格参数
Sigmstar SSD202D | Dual Cortex-A7 1.2Ghz Processor |
Flash | 512MB NAND |
RAM | 128MB-DDR3 |
Camera | GC2145 1080P Colored Sensor |
镜头 | FOV 68° , DOF= 60cm- ∞ |
输入电压 | 5V @ 500mA |
硬件外设 | TypeC x1, UART x1, TFCard x1, Button x1, Microphone x1, Fan x1 |
指示灯 | 红, 白 |
WiFi | 150Mbps 2.4GHz 802.11 b/g/n |
工作温度 | 32°F to 104°F ( 0°C to 40°C ) |
净重 | 18g |
毛重 | 62g |
产品尺寸 | 48*18.5*24mm |
包装尺寸 | 157*38*38mm |
外壳材质 | Plastic ( PC ) |
任务介绍:
本次活动我选择了口罩这个任务,在检测摄像头监控范围内识别人脸并判断是否佩戴口罩,如果没带口罩就报警,报警的办法有多种,选择适当合理的就行
任务实现:
1、在内嵌网址中的照相功能直接照相就行,然后下载下来
2、
最复杂的训练部分,本次活动公司已经有了官方的训练网址 http://v-training.m5stack.com/build/index.html
在这里可以进行傻瓜式操作
然后训练好了界面如图,直接下载下来就可了
3、然后下载回来就可以进行Python部分的事情了
采用插件自己打开服务器然后输入跳转至目标网址
'''这个需要插件,自己下载相应的'''
driver = webdriver.Chrome('C:\Program Files\Google\Chrome\Application\chrome.exe')
# get 方法 打开指定网址
driver.get('http://10.254.239.1/')
html = driver.page_source
time.sleep(10)
driver.find_element_by_id("object_recognition")
driver.find_element_by_id("object_recognition").click()
time.sleep(10)
driver.find_element_by_id("yolo_run")
driver.find_element_by_id("yolo_run").click()
time.sleep(10)
然后上传到云端
productKey = '自己找,阿里云有'
deviceName = '自己找,阿里云有'
deviceSecret = '自己找,阿里云有'
# MQTT - 合成connect报文中使用的 ClientID、Username、Password
mqttClientId = deviceName + '|securemode=3,signmethod=hmacsha1|'
mqttUsername = deviceName + '&' + productKey
content = 'clientId' + deviceName + 'deviceName' + deviceName + 'productKey' + productKey
mqttPassword = hmac.new(deviceSecret.encode(), content.encode(), sha1).hexdigest()
# 接入的服务器地址
regionId = '阿里云左上角那个填这里'
# MQTT 接入点域名
brokerUrl = productKey + '.iot-as-mqtt.' + regionId + '.aliyuncs.com'
# Topic,post,客户端向服务器上报消息
topic_post = '/sys/' + productKey + '/' + deviceName + '/thing/event/property/post'
# Topic,set,服务器向客户端下发消息
topic_set = '/sys/' + productKey + '/' + deviceName + '/thing/service/property/set'
# 物模型名称的前缀(去除后缀的数字)
modelName = 'mark_person_'
# json合成上报开关状态的报文
def json_switch_set(num, status):
switch_info = {}
switch_data = json.loads(json.dumps(switch_info))
switch_data['method'] = '/thing/event/property/post'
switch_data['id'] = random.randint(0000, 9999)
switch_status = {modelName + num: status}
switch_data['params'] = switch_status
return json.dumps(switch_data, ensure_ascii=False)
# 戴口罩初始值0
mark_person: int = 0
nomark_person = 0
# 建立mqtt连接对象
client = mqtt.Client(mqttClientId, protocol=mqtt.MQTTv311, clean_session=True)
def on_log(client, userdata, level, buf):
if level == MQTT_LOG_INFO:
head = 'INFO'
elif level == MQTT_LOG_NOTICE:
head = 'NOTICE'
elif level == MQTT_LOG_WARNING:
head = 'WARN'
elif level == MQTT_LOG_ERR:
head = 'ERR'
elif level == MQTT_LOG_DEBUG:
head = 'DEBUG'
else:
head = level
print('%s: %s' % (head, buf))
# MQTT成功连接到服务器的回调处理函数
def on_connect(client, userdata, flags, rc):
print('Connected with result code ' + str(rc))
# 与MQTT服务器连接成功,之后订阅主题
client.subscribe(topic_post, qos=0)
client.subscribe(topic_set, qos=0)
# 向服务器发布测试消息
client.publish(topic_post, payload='test msg', qos=0)
# MQTT接收到服务器消息的回调处理函数
def on_message(client, userdata, msg):
print('recv:', msg.topic + ' ' + str(msg.payload))
def on_disconnect(client, userdata, rc):
if rc != 0:
print('Unexpected disconnection %s' % rc)
def mqtt_connect_aliyun_iot_platform():
client.on_log = on_log
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.username_pw_set(mqttUsername, mqttPassword)
print('clientId:', mqttClientId)
print('userName:', mqttUsername)
print('password:', mqttPassword)
print('brokerUrl:', brokerUrl)
# ssl设置,并且port=8883 client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
# tls_version=ssl.PROTOCOL_TLS, ciphers=None)
try:
client.connect(brokerUrl, 1883, 60)
except:
print('阿里云物联网平台MQTT服务器连接错误,请检查设备证书三元组、及接入点的域名!')
client.loop_forever()
然后进行判断步骤,判断完直接报警
def publish_loop():
global mark_person, nomark_person
mark_person = 0
while 1:
element_keyword = driver.find_element_by_id('func-result-pre')
json_date = element_keyword.text
json_jiexi = json.loads(json_date)
try:
person_num: object = json_jiexi['num']
assert isinstance(person_num, object)
print(person_num)
if person_num == 1:
nomark_person += 1
print('1000')
##这里选择你喜欢的报警模式
elif person_num == 2:
mark_person += 1
print('1001')
##这里选择你喜欢的报警模式
time.sleep(5)
switchPost = json_switch_set('1', mark_person)
client.publish(topic_post, payload=switchPost, qos=0)
except:
time.sleep(1)
if __name__ == '__main__':
# 建立线程t1:mqtt连接阿里云物联网平台
# 建立线程t2:定时向阿里云发布消息
t1: Thread = threading.Thread(target=mqtt_connect_aliyun_iot_platform, )
t2 = threading.Thread(target=publish_loop, )
t1.start()
t2.start()
因为我没有多个摄像头,后来表达报警方式改成了钉钉机器人报警,设置的是关键词加密
我就单po出来了
import requests
import json
def getDingMes():
baseUrl = "https://oapi.dingtalk.com/robot/send?access_token=你自己的"
# please set charset= utf-8
HEADERS = {
"Content-Type": "application/json ;charset=utf-8"
}
# 这里的message是你想要推送的文字消息
message = "有人没带口罩"
stringBody ={
"msgtype": "text",
"text": {"content": message},
"at": {
"atMobiles": [""],
"isAtAll":1 #@所有人 时为true,上面的atMobiles就失效了
}
}
MessageBody = json.dumps(stringBody)
result = requests.post(url=baseUrl, data=MessageBody, headers=HEADERS)
print(result.text)
if __name__ == '__main__':
getDingMes()
下面是网址识别时的界面
然后下面是云端接受到的数据
报警
非常nice
活动感想:
非常感谢电子森林举办的这次活动,能够有机会接触到这样的智能摄像头。
也是我第一次接触Python语言的事情
不过就是这个摄像头发热太严重了,建议改一下散热方案
附件下载
unitv2.py
新建文件夹 (2).rar
训练的模型
团队介绍
评论
0 / 100
查看更多