项目介绍
本项目是基于MAX78000实现的一款带有离线语音识别功能的智能桶装水抽水器。项目采用MAX78000作为主控,识别语音控制命令并通过控制继电器来实现抽水器的开关控制和出水量控制。
系统模块连接图如下所示,其中MAX78000负责语音命令词的识别和控制信号的发出,通过常开继电器来控制水泵电机的开关来实现抽水器的控制。
项目设计思路
MAX78000 的特点是能够超低功耗执行神经网络,这次就打算以MAX78000这个平台为基础实现一个可以通过语音控制的智能桶装水抽水器,实现在不方便手动操作的场景下进行接水。
词条设计列表:
唤醒命令 | 小美小美 |
开关控制 | “开始出水”,“停止出水” |
出水量调节指令 | “倒一杯水” |
语音识别网络的训练与与量化参照官方文档进行操作即可,官方文档整体描述的还是比较清晰的,整体流程就是基于训练存储库ai8x-training进行深度学习模型开发和训练,基于综合存储库ai8x-synthesis对训练后的网络进行量化并使用“izer”工具将训练好的模型转换为C代码。
搜集素材的思路
KWS项目的素材收集主要就是关键词语音数据的收集过程,由于是定制化开发,所以就不能像官方的KWS-Demo一样采用开源的语音数据集用于模型的训练和测试。
这里我主要采用语音录制和语音合成的两种方式进行语音数据的收集。关于语音录制这块有两种思路,第一种是通过录音软件进行关键词的连续多次录音,然后编写脚本进行语音词段的分割;第二种是按照预期语音数据格式进行逐次录制,然后直接生成关键词语音数据。两种方式各有利弊,可以根据自己的喜好和习惯方式进行选择。
我这里采用的是第二种方法。官方是提供了一个语音录制脚本VoiceRecorder.py,但是存在着一些不足的地方,比如录制完之后不能确定录制音频的质量。我这里采用的上次第一季比赛时用到的录音脚本进行录音。
在录音的过程中需要严格按网络模型的数据加载器的预期格式(1秒16 kHz单声道wav音频)进行录制,需要注意的是音量不要太小了。
这次是在第一期的录音脚本基础上对代码进行优化和升级,增加了可视化UI界面使得音频数据采集的过程更高效。
下面是具体的代码实现:
import wave
import pyaudio
import os
import shutil
import time
import numpy
import matplotlib.pyplot as plt
import tkinter as tk
from tkinter import ttk
import sounddevice as sd
import traceback
class AudioRecorderApp:
def __init__(self, master):
self.master = master
self.master.title("Audio Recorder")
self.CHUNK = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.RECORD_SECONDS = 1
self.frames = []
self.p = pyaudio.PyAudio()
self.stream = None
self.create_widgets()
def create_widgets(self):
ttk.Label(self.master, text="Label:").grid(row=0, column=0, padx=5, pady=5)
self.laber_entry = ttk.Entry(self.master)
self.laber_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(self.master, text="Record Time (s):").grid(row=0, column=2, padx=5, pady=5)
self.record_time_entry = ttk.Entry(self.master)
self.record_time_entry.grid(row=0, column=3, padx=5, pady=5)
self.record_time_entry.insert(tk.END, str(self.RECORD_SECONDS))
self.start_button = ttk.Button(self.master, text="Start Recording", command=self.start_recording)
self.start_button.grid(row=1, column=0, columnspan=2, pady=10)
self.play_button = ttk.Button(self.master, text="Play Recording", command=self.play_recording, state=tk.DISABLED)
self.play_button.grid(row=1, column=2, columnspan=2, pady=10)
self.delete_button = ttk.Button(self.master, text="Delete Recording", command=self.delete_recording, state=tk.DISABLED)
self.delete_button.grid(row=2, column=0, columnspan=2, padx=5, pady=10)
self.save_button = ttk.Button(self.master, text="Save Recording", command=self.save_recording, state=tk.DISABLED)
self.save_button.grid(row=2, column=2, columnspan=2, padx=5, pady=10)
def start_recording(self):
# if self.stream and self.stream.is_active():
# # 如果已经有正在录音的流,停止它
# self.stream.stop_stream()
# self.stream.close()
self.frames = []
laber = self.laber_entry.get() if self.laber_entry.get() else "default"
self.RECORD_SECONDS = float(self.record_time_entry.get())
self.stream = self.p.open(format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK)
print("* recording")
OFF_TIME = int(self.RATE / self.CHUNK * 0.4)
for i in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS) + OFF_TIME):
data = self.stream.read(self.CHUNK)
if i >= OFF_TIME:
self.frames.append(data)
print("* done recording")
self.stream.stop_stream()
self.start_button.config(state=tk.NORMAL)
self.play_button.config(state=tk.NORMAL)
self.delete_button.config(state=tk.NORMAL)
self.save_button.config(state=tk.NORMAL)
self.plot_waveform()
def plot_waveform(self):
wave_data = numpy.frombuffer(b''.join(self.frames), dtype=numpy.int16)
time = numpy.arange(0, len(wave_data)) * (1.0 / self.RATE)
plt.figure()
plt.plot(time, wave_data)
plt.title("Waveform")
plt.xlabel("Time (seconds)")
plt.ylabel("Amplitude")
plt.show(block=False)
def delete_recording(self):
self.frames = []
self.play_button.config(state=tk.DISABLED)
self.delete_button.config(state=tk.DISABLED)
self.save_button.config(state=tk.DISABLED)
plt.close()
def save_recording(self):
if self.frames:
laber = self.laber_entry.get() if self.laber_entry.get() else "default"
folder_path = f"./{laber}"
os.makedirs(folder_path, exist_ok=True)
file_path = f"{folder_path}/{laber}_{int(time.mktime(time.localtime()))}.wav"
wf = wave.open(file_path, 'wb')
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b''.join(self.frames))
wf.close()
print(f"Recording saved: {file_path}")
plt.close()
def play_recording(self):
try:
if self.frames:
wave_data = numpy.frombuffer(b''.join(self.frames), dtype=numpy.int16)
sd.play(wave_data, self.RATE)
sd.wait() # 等待播放完成
except Exception as e:
print(f"An error occurred during playback: {e}")
traceback.print_exc()
def run(self):
self.master.mainloop()
if __name__ == '__main__':
root = tk.Tk()
app = AudioRecorderApp(root)
app.run()
训练实现过程
在前面的章节已经完成对语音数据的收集工作,在确保语音数据质量达标的前提,就可以逐步进行添加自定义命令词数据,调整数据加载器,修改网络模型参数,训练 ,量化,评估和综合。
1 添加自定义命令词数据
首先需要为新添加的关键词标签创建一个文件夹(例如:'dakaishuilongtou')在ai8x-training/data/KWS/raw/dakaishuilongtou文件夹中复制所有”开灯“语音样本到该文件夹中。
如果ai8x-training/data/KWS/processed存在,则需要删除它,数据加载器才可以重新创建带有其他标签的数据集,也就是每次语音数据集更新或发生变化都需重新生成processed部分。
操作时确保数据集的目录层次结构正确:
data -- KWS
|--raw: contains folders with labels name, each containing 1sec .wav file for that label
2 数据加载器调整
kws20项目的数据加载器脚本是ai8x-training/datasets/kws20.py文件。
添加新的标签(例如:'kaideng')到已排序的字典的正确位置(必须要按照标签搜字母进行排序),且具有唯一的增量值:
class_dict = {'backward': 0, 'bed': 1, 'bird': 2, 'cat': 3, 'daoyibeishui': 4,'dingshi': 5, 'dog': 6, 'down': 7,
'eight': 8, 'five': 9, 'follow': 10, 'forward': 11, 'four': 12, 'go': 13,'guanbichushui': 14,
'happy': 15, 'house': 16, 'kaideng': 17, 'kaishichushui': 18, 'learn': 19, 'left': 20, 'marvin': 21, 'nine': 22,
'no': 23, 'off': 24 ,'on': 25, 'one': 26, 'right': 27, 'seven': 28,
'sheila': 29, 'six': 30, 'stop': 31, 'three': 32,'tiaoanyidian': 33, 'tiaoliangyidian': 34,'tingzhichushui': 35,'tree': 36, 'two': 37,
'up': 38, 'visual': 39, 'wow': 40, 'xiaomeixiaomei': 41,'yes': 42, 'zero': 43}
更新数据集中相关定义的“output”枚举。在“weight”中添加在kws20的基础上增加的词条标,最后一个类表示“未知”类别。
为了使优化器对每个类的样本数量保持平衡,可以将权重与每个标签的样本数量成反比。
具体的公式 weight[i] = (size of smallest label)/(size of label i)
{
'name': 'KWS_25', # 20 keywords
'input': (128, 128),
'output': ('up', 'down', 'left', 'right', 'stop', 'go', 'yes', 'no', 'on', 'off', 'one',
'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'zero','daoyibeishui', 'guanbichushui', 'kaishichushui', 'tingzhichushui','xiaomeixiaomei',
'UNKNOWN'),
'weight': (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 20, 28, 20, 20, 15,0.14),
'loader': KWS_25_get_datasets,
},
调整KWS_get_datasets函数适配25词的KWS词条列表,通过上面的“output”枚举去自动索引。
if num_classes in (6, 25):
classes = next((e for _, e in enumerate(datasets)
if len(e['output']) - 1 == num_classes))['output'][:-1]
print("classes", classes)
else:
raise ValueError(f'Unsupported num_classes {num_classes}')
创建KWS_25_get_datasets函数以返回新定义的分类类的数量。
def KWS_25_get_datasets(data, load_train=True, load_test=True):
return KWS_get_datasets(data, load_train, load_test, num_classes=25)
3 修改网络模型参数
更新ai8x-training/models/ai85net-kws20.py中初始化部分的num_classes变量默认值,根据自己的分类数进行调整。
# num_classes = n keywords + 1 unknown
def __init__(
self,
num_classes=26, # was 21
num_channels=128,
dimensions=(128, 1),
fc_inputs=7,
bias=False,
**kwargs
):
4 训练
训练环境的搭建是在电脑上安装Win+Ubuntu20.03双系统,然后按照官方训练环境搭建文档进行一步步操作搭建起来的,这部分官方文档描述的比较清楚,这里就不再赘述了。
训练一开始会从所有标记样本中创建处理数据集,大概需要几个小时左右。如果不更新数据集的话,第二次会直接开始训练。
进入ai8x-training目录激活python训练环境后,执行训练命令进行训练
python train.py --epochs 100 --optimizer Adam --lr 0.001 --wd 0 --deterministic --compress policies/schedule_kws20.yaml --model ai85kws20net --dataset KWS_25 --confusion --device MAX78000 "$@"
在这个过程中如果出现问题,可以根据log信息进行定位或者再重新按照文档说明重新梳理下自己的操作流程,一般的问题都可以解决。
这里我遇到的大部分问题是增加词条时操作和修改数据加载器的过程。
训练结束后会在ai8x-training/logs/目录生成对应的训练结果相关文件,后面评估量化评估需要用到。
5 量化
量化采用的qat方式,我这里为方便操作首先把前面训练好的数据拷贝到ai8x-synthesis/proj/目录,然后执行量化命令;
python quantize.py proj/qat_best.pth.tar proj/kws25-qat8-q.pth.tar --device MAX78000 -v "$@"
量化结束后会在ai8x-synthesis/proj/目录生成量化后数据文件。
6 评估和综合
量化完成后可以对量化后结果进行评估,
python train.py --model ai85kws20net --dataset KWS_25 --confusion --evaluate --exp-load-weights-from ../ai8x-synthesis/proj/kws25-qat8-q.pth.tar -8
再回到ai8x-synthesis/目录通过ai8xize.py脚本执行转换命令,输出结果生成在ai8x-synthesis/demo/kws25_v1目录。
python ai8xize.py --test-dir demo --prefix kws25_v2 --checkpoint-file proj/kws25-qat8-q.pth.tar --config-file networks/kws20-hwc.yaml --softmax --device MAX78000 --timer 0 --display-checkpoint --verbose "$@"
7 更新kws20_demo Demo应用程序
将kws20_demo的工程中的cnn.c, cnn.h, weights.h, sampledata.h 4个文件替换为KAT C代码生成的(上一步生成)。
更新main.c的识别词数组
const char keywords[NUM_OUTPUTS][20] = {"up", "down", "left", "right", "stop", "go", "yes", "no", "on", "off", "one","two", "three", "four", "five", "six", "seven", "eight", "nine", "zero","daoyibeishui", "guanbichushui", "kaishichushui", "tingzhichushui","xiaomeixiaomei","UNKNOWN"};
为了防止误识别,只有识别到“小美小美”之后再识别到“倒一杯水”才会执行抽水任务。
volatile int enable_flag = 0;
mxc_gpio_cfg_t gpio_out;
/* Setup output pin. */
gpio_out.port = MXC_GPIO2;
gpio_out.mask = MXC_GPIO_PIN_6;
gpio_out.pad = MXC_GPIO_PAD_NONE;
gpio_out.func = MXC_GPIO_FUNC_OUT;
gpio_out.vssel = MXC_GPIO_VSSEL_VDDIOH;
gpio_out.drvstr = MXC_GPIO_DRVSTR_0;
MXC_GPIO_Config(&gpio_out);
MXC_GPIO_OutSet(gpio_out.port, gpio_out.mask);
PR_INFO("\n*** READY ***\n");
/* find detected class with max probability */
ret = check_inference(ml_softmax, ml_data, &out_class, &probability);
PR_DEBUG("----------------------------------------- \n");
if (!ret) {
PR_DEBUG("LOW CONFIDENCE!: ");
}
else{
if(out_class == 21){// guangdeng
PR_DEBUG("guanbichushui \r\n");
}
else if(out_class == 20){//kaideng
PR_DEBUG("daoyibeishui \r\n");
if(enable_flag == 1){
PR_DEBUG("TASK START \r\n");
MXC_GPIO_OutClr(gpio_out.port, gpio_out.mask);
MXC_Delay(MSEC(200));
MXC_GPIO_OutSet(gpio_out.port, gpio_out.mask);
MXC_Delay(SEC(10));
MXC_GPIO_OutClr(gpio_out.port, gpio_out.mask);
MXC_Delay(MSEC(200));
MXC_GPIO_OutSet(gpio_out.port, gpio_out.mask);
PR_DEBUG("TASK END \r\n");
enable_flag = 0;
}
}
else if(out_class == 20){//dingshi
}
else if(out_class == 23){//liangyidian
PR_DEBUG(" \r\n");
}
else if(out_class == 24){//anyidian
PR_DEBUG("xiaomeixiaomei \r\n");
MXC_GPIO_OutSet(gpio_out.port, gpio_out.mask);
enable_flag = 1;
}
else{
enable_flag = 0;
MXC_GPIO_OutSet(gpio_out.port, gpio_out.mask);
}
}
PR_DEBUG("Detected word:(%d)%s (%0.1f%%)",out_class, keywords[out_class], probability);
PR_DEBUG("\n----------------------------------------- \n");
实验成果
总结:
该项目以MAX78000为核心,实现一款带有离线语音识别功能的智能桶装水抽水器。项目主要经历了硬件选型、语音数据搜集、模型训练、量化与综合等多个关键步骤。通过语音控制命令,用户可以轻松实现抽水器的开关控制和出水量控制。
在语音数据搜集方面,项目采用了两种方式:录音和语音合成。录制过程中要求符合模型数据加载器的预期格式,保证录制音频的质量。模型训练则基于MAX78000平台,通过ai8x-training和ai8x-synthesis库完成,涉及到数据加载器调整、网络模型参数修改、训练、量化、评估和综合等环节。其中,量化采用了qat方式,便于在MAX78000上的高效运行。
整个项目既考验了深度学习的知识,包含了嵌入式设备的开发。通过离线语音识别,提高了抽水器在无需手动操作的场景下的智能程度。这样的综合性项目在全栈开发体验让我对嵌入式AI应用有了更深入的实践经验。