任务介绍:
参加funpack第八期活动,获得Arduino Nano 33 BLE Sense开发板。这是一款非常小巧的开发板,集成了非常多的传感器,并且tinyml提供了对她的支持。一直觉着机器学习很好玩,所以拿起这个板子又做了个语音控制LED颜色的项目,细致学习一下机器学习。
硬件说明:这个任务中使用的开发板是Arduino Nano 33 BLE Sense,利用板子上的麦克风,做为收集模块;用面包板接了个WS2812灯。用语音控制 WS2812灯显示不同的颜色。
任务实现:
机器学习的三个步骤:收集数据——>训练模型——>使用模型。接下来从这三个方面介绍制作过程。
1、收集数据:通过语音控制LED灯的颜色。则首先就是要收集声音信号了。 Nano33板子上集成了麦克风模块,通过Arduino的PDM库,很容易就读取到了麦克风的信号。这里需要注意两个要素:采样频率和量化位数。采样率就是将时间进行切片。不去采集所有的时间点上的所有数据,因为根本采集不完,而是隔一段时间采集一次,将连续的数据变为离散的。只要采集的次数够多,就可以逼近真实场景。CD的采样率是44100次/秒。采样率为所容纳的频率的两倍,也就是说CD最高记录频率为22.05khz的声音,这个到达了人耳的上限了。普通说话的声音都在3khz以内,所以老的电话通讯采用率用的是8000,传送语音足够了。这里使用PDM库里,最低可以使用的16000。使用单声道。
量化位数:是指将模拟信号经过采样转换为数字的过程。下图就是使用3bit来对波进行量化。量化位数越多,则采样越细腻。PDM的库是使用16位量化深度。
#include <PDM.h>
//麦克风设置
const short BLOCK_SIZE = 512; //声音缓冲区大小
short sampleBuffer[BLOCK_SIZE];
volatile int samplesRead;
void setup() {
Serial.begin(115200);
//麦克风设置
PDM.setBufferSize(1024); //设置缓冲区
PDM.onReceive(onPDMdata); //采集到声音后的处理函数
PDM.begin(1, 1600); //声道1 ,采样率16000
delay(100);
}
void loop() {
int maxval = 0;
digitalWrite(redled, 0);
if (samplesRead) { //每次读取512个声音数据 16000的采样率
Serial.write((char*)sampleBuffer, samplesRead * 2); //送到串口
samplesRead = 0;
}
}
//读取麦克风数据
void onPDMdata() {
short bytesAvailable = PDM.available();
// Read into the sample buffer
PDM.read(sampleBuffer, bytesAvailable);
// 16-bit, 2 bytes per sample
samplesRead = bytesAvailable / 2;
}
通过串口,将麦克风的信号,送到串口验证是否读取的声音是否正常。
从麦克风获得的声音的波形信息。在github上看见有直接将这个波形丢给神经网络做训练的。但是觉着,波形信息受输入的影响干扰太大了,频域信息能更加真切地表现声音携带的信息,而且更多的大神也是使用频域信息来做训练的。
//傅里叶变换
void FFT_Operation() {
double vReal[BLOCK_SIZE];
double vImag[BLOCK_SIZE];
for (uint16_t i = 0; i < BLOCK_SIZE; i++) {
vReal[i] = sampleBuffer[i] << 8;
vImag[i] = 0.0;
}
FFT.Windowing(vReal, BLOCK_SIZE, FFT_WIN_TYP_HAMMING, FFT_FORWARD);
FFT.Compute(vReal, vImag, BLOCK_SIZE, FFT_FORWARD);
FFT.ComplexToMagnitude(vReal, vImag, BLOCK_SIZE);
for (int i = 0; i < ttfnums; i++) {
bands[i] = 0;
}
//0赫兹不存在,另外由于FFT结果的对称性,通常我们只使用前半部分的结果,即小于采样频率一半的结果。 并且对数据做缩小处理
for (int i = 1; i < (BLOCK_SIZE / 2); i++) {
if (vReal[i] > 2000) {
if (i <= 4 ) bands[0] = max(bands[0], (int)(vReal[i] / amplitude));
if (i > 4 && i <= 8 ) bands[1] = max(bands[1], (int)(vReal[i] / amplitude));
if (i > 8 && i <= 12 ) bands[2] = max(bands[2], (int)(vReal[i] / amplitude));
if (i > 12 && i <= 16 ) bands[3] = max(bands[3], (int)(vReal[i] / amplitude));
if (i > 16 && i <= 20 ) bands[4] = max(bands[4], (int)(vReal[i] / amplitude));
if (i > 20 && i <= 25 ) bands[5] = max(bands[5], (int)(vReal[i] / amplitude));
if (i > 25 && i <= 30 ) bands[6] = max(bands[6], (int)(vReal[i] / amplitude));
if (i > 30 && i <= 40 ) bands[7] = max(bands[7], (int)(vReal[i] / amplitude));
if (i > 40 && i <= 60 ) bands[8] = max(bands[8], (int)(vReal[i] / amplitude));
if (i > 60 ) bands[9] = max(bands[9], (int)(vReal[i] / amplitude));
}
}
}
这里使用快速傅里叶变换,将512个声波数据转换到频域上。转换后依然是512个字节的数据,代表着不同频域上的数值。频域范围为0~8k。由于FFT结果的对称性,所以0~8k的范围落256个数上。就是每个数据代表着31.25hz的范围。
每说一个命令,比如“翠绿”大概需要0.8秒。0.8秒就意味着要收集25组,每组512的数据。做了FFT变换后,依然后25x256=6400个数据,用字节来表示,也就是6400个字节,对单片机压力太大了。所以将每次fft变换的结果映射到10个数里边去。因为人声大多数频率在1Khz左右,所以这里没有采用均匀分布的方式。而是将1~60(32hz~1875hz)做了9个映射,60以上单独归类(其实也可以抛弃)。
每512个数据为一组,理论上1秒应该有16000/512=31.25组数据,但是实测差不多15组数据就有1秒钟了,猜测是收集数据和傅里叶变换拉慢了单片机的处理。这里使用的是12个时间片。这样每个命令字就是由12*512个short数字构成,然后通过快速傅里叶变换,转换为120个字节长度。
接下来就是为神经网络准备训练数据了。将麦克风收集到的数据,转换为120个字节后,写入串口,使用串口工具接收并写入文件。
//将收集到的数据写给串口
void writeRecInfo() {
for (int i = 0; i < soundnums; i++) {
for (int j = 0; j < ttfnums; j++) {
Serial.print(recband[i][j]);
Serial.print(',');
}
}
Serial.println();
}
每个命令字至少收集30组数据。越多的数据,后期训练的越好。命令字之间最好有明显的区别,这里设定了四个颜色,对应命令字分别为:'翠绿': 0, '粉红': 1, '深蓝': 2, '金黄': 3。收集命令字过程中最好与最终使用环境要一致,即如果使用环境中有背景噪音,那么收集训练数据最好也有背景噪音。如果多个命令字都对应一个颜色,比如:“深蓝、湖蓝、天蓝……”都对应蓝色,那么训练数据就需要都包含进去,并且有一定的数量。(收集命令字很累、很无聊,如果有兴趣可以多收集些命令字)
2、训练数据。收集完数据后,得到了4个csv的文件。每一行就是一次命令的特征值,每个值从0~255。
soundnums=12 #采集多少个时间片长
ttfnums=10 #将fft的结果映射到矩阵的个数
labdic = {'翠绿': 0, '深蓝': 1, '金黄': 2, '纯白': 3}
def processDate(file, y_val):
openD = pd.read_csv(file, header=None)
xdata = np.array(openD)
if xdata.shape[1] > soundnums*ttfnums:
xdata = xdata[:, :-1]
xdata=xdata/256
dataY = np.ones((xdata.shape[0],)) * y_val
return xdata, dataY
使用python对数据进行预处理。将csv文件转换为numpy的矩阵数据。因为串口传出的数据,每一行最尾端多了一个“,“所以读取到的矩阵,每行都是121列了,对多出的一列做切片处理。然后将数据归一化,既每个数都除以256.这样每个数据都映射到了【0~1】。对不同命令字给予不同的标签。
def loaddata():
dataX0, dataY0 = processDate('../res/green.csv', 0)
dataX1, dataY1 = processDate('../res/blue.csv', 1)
dataX2, dataY2 = processDate('../res/gold.csv', 2)
dataX3, dataY3 = processDate('../res/white.csv', 3)
dataX = np.concatenate((dataX0, dataX1, dataX2, dataX3), axis=0)
dataY = np.append(dataY0, dataY1)
dataY = np.append(dataY, dataY2)
dataY = np.append(dataY, dataY3)
# dataY = np.append(dataY, dataY4)
# print(dataX,dataY)
permutationTrain = np.random.permutation(dataX.shape[0])
# print(permutationTrain)
dataX = dataX[permutationTrain]
dataY = dataY[permutationTrain]
vfoldSize = int(dataX.shape[0] / 100 * 20)
xTest = dataX[0:vfoldSize]
yTest = dataY[0:vfoldSize]
xTrain = dataX[vfoldSize:dataX.shape[0]]
yTrain = dataY[vfoldSize:dataY.shape[0]]
return xTest, yTest, xTrain, yTrain
然后将获得的4个命令字的矩阵合并起来,使用随机数打乱,取20%作为验证数据集,80%作为训练数据集。
model = keras.Sequential()
model.add(keras.layers.Dense(32, input_shape=(soundnums*ttfnums,), activation='relu'))
model.add(keras.layers.Dense(16, activation='relu'))
model.add(keras.layers.Dense(4, activation='softmax'))
adam = keras.optimizers.Adam(0.0000005)
model.compile(loss='sparse_categorical_crossentropy', optimizer=adam, metrics=['sparse_categorical_accuracy'])
model.summary()
history = model.fit(xTrain, yTrain, batch_size=1, validation_data=(xTest, yTest), epochs=1500, verbose=1)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
open("model", "wb").write(tflite_model)
数据准备好后,就用keras生成一个模型。这里使用的是Sequential模型。模型具体的工作原理我也没搞太清楚,基本理解就是使用矩阵相乘,逐层降低矩阵的宽度,最后形成1X4的结果矩阵。这里使用的是3层模型。第一层入口尺寸是120,出口是32;第二层入口尺寸32,出口为16;最后一层就是需要的结果了,尺寸为4。1、2层用rule作为激活函数。使用Adam优化器,优化器的参数用的0.0000005。学习速率越低,训练的时长越长,但是学习效果越好。这里我的机器没有GPU,但是输入矩阵比较小,所以计算速度都还能忍受。使用epochs=1500次就能获得不错的效果。训练结束后,将结果写入model文件中,这实际上是一个二进制保存的矩阵。需要注意的是,神经网络的层数越多,结果文件就越大;分类种数越多,结果文件也会变大。当结果文件model超过一定值时,单片机那边就没法载入使用了。再使用命令xxd -i> model.h就可以得到Arduino可以使用的头文件了。
3、使用模型。有了训练好的模型就可以在单片机上调用模型进行推理了。基本流程:这里推理使用的数据要和训练用的数据处理方法一致。
#include <PDM.h>
#include <arduinoFFT.h>
#include <Adafruit_NeoPixel.h>
#include <TensorFlowLite.h>
#include <tensorflow/lite/micro/all_ops_resolver.h>
#include <tensorflow/lite/micro/micro_error_reporter.h>
#include <tensorflow/lite/micro/micro_interpreter.h>
#include <tensorflow/lite/schema/schema_generated.h>
#include <tensorflow/lite/version.h>
#include "model.h"
const int redled = LEDR; //红色led灯
const int soundnums = 12; //采集多少个时间片长
const short ttfnums = 10; //将fft的结果映射到矩阵的个数
const int BLOCK_SIZE = 512;
const uint8_t amplitude = 150;
short sampleBuffer[BLOCK_SIZE];
volatile int samplesRead;
arduinoFFT FFT = arduinoFFT();
int bands[ttfnums] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
int recband[soundnums][ttfnums]; //,用来记录关键字信息
uint8_t recflag = 0;
int smooth_count = 0;
int record_count = 0;
//控制ws2812的颜色
Adafruit_NeoPixel strip = Adafruit_NeoPixel(2, A7, NEO_GRB + NEO_KHZ800);
tflite::MicroErrorReporter tflErrorReporter;
tflite::AllOpsResolver tflOpsResolver;
const tflite::Model* tflModel = nullptr;
tflite::MicroInterpreter* tflInterpreter = nullptr;
TfLiteTensor* tflInputTensor = nullptr;
TfLiteTensor* tflOutputTensor = nullptr;
constexpr int tensorArenaSize = 40 * 1024;
byte tensorArena[tensorArenaSize];
const char* VOICES[] = {
"green",
"pink",
"blue",
"gold"
};
#define NUM_VOICES (sizeof(VOICES) / sizeof(VOICES[0]))
void setup() {
Serial.begin(115200);
tflModel = tflite::GetModel(model);
if (tflModel->version() != TFLITE_SCHEMA_VERSION) {
Serial.println("Model schema mismatch!");
while (1);
}
tflInterpreter = new tflite::MicroInterpreter(tflModel, tflOpsResolver, tensorArena, tensorArenaSize, &tflErrorReporter);
tflInterpreter->AllocateTensors();
tflInputTensor = tflInterpreter->input(0);
tflOutputTensor = tflInterpreter->output(0);
delay(1000);
pinMode(redled, OUTPUT); //初始化LED
PDM.setBufferSize(1024); //设置缓冲区
PDM.onReceive(onPDMdata); //采集到声音后的处理函数
PDM.begin(1, 16000); //声道1 ,采样率16000
}
在Arduino中引入TensorFlowLite的库,载入刚才训练好的模型文件。然后需要预先为输入、输出以及中间数组分配一定的内存。这里用常量表达式进行定义,要预先将TF的运算空间分配好。该预分配的内存是一个大小为 tensorArenaSize 的 byte 数组。这个数组的大小是根据模型的大小来定义的,一般为1024的倍数。在这里我申请了40*1024长度的空间。过大有可能编译不通过。
//收到,进行推理
void judgeSound() {
float max = 0;
char pos;
for (int i = 0; i < soundnums; i++) {
for (int j = 0; j < ttfnums; j++) {
tflInputTensor->data.f[i * ttfnums + j] = recband[i][j] / 256.0;
}
}
TfLiteStatus invokeStatus = tflInterpreter->Invoke();
if (invokeStatus != kTfLiteOk) {
Serial.println("Invoke failed!");
while (1);
return;
}
for (int i = 0; i < NUM_VOICES; i++) {
if (max < tflOutputTensor->data.f[i]) {
max = tflOutputTensor->data.f[i];
pos = i;
}
Serial.print(VOICES[i]);
Serial.print(": ");
Serial.print(tflOutputTensor->data.f[i], 6);
Serial.print(" ");
}
Serial.println();
//控制灯
当收集到声音命令的120个字节的数据后,就喂给张量去进行推理。推理的结果是个长度为分类个数的数组,数组每个变量就是对声音命令在该分类的概率值,范围【0~1】。寻找最大值就是最有可能的命令分类。根据概率最大的分类就可以做自己想要的操作啦!
体会:
随着单片机算力的增强,在单片机上通过机器学习来解决实际问题的方法也越来越多。这是用与传统解决问题方式不同的方式来解决问题,非常有意思!很多网站也提供了机器学习完整的解决方案,但是自己从头完整地做完一个项目,还是能学习到很多东西的。