一、项目描述
- 项目介绍
本次参加的是funpack第二季第五期的活动,所实现的是任务一,一个基于基于esp32boxlite的,使用micropython编程的天气信息语音播报系统。同时搭建了一个小的文件操作服务器,同时使用了心知天气和讯飞的TTS合成这两个端口,从而实现一个简单的天气语音播报系统。
- 设计思路
本次设计思路基本上是一个硬件结合一个软件部分,具体的一个详细过程如下:
1、首先,我们需要让ESP32连接到Wi-Fi网络。使用ESP32的Wi-Fi联网,可以在代码中设置Wi-Fi连接所需的SSID和密码,然后调用连接函数进行连接。连接成功后,ESP32就可以通过互联网访问其他网络资源。
2、连接成功后,我们可以使用ESP32访问http的功能,向心知天气的API发送HTTP请求,获取天气信息。心知天气的API会返回一个JSON格式的数据,其中包含天气信息如温度、天气状况等。
3、获取到天气信息后,我们需要将这些信息发送给Web服务器。可以使用ESP32的HTTP,将天气信息打包成HTTP POST请求,并发送到Web服务器的特定端点。
4、Web服务器接收到天气信息后,可以使用Python的文件操作,将天气信息写入input.txt文件。可以使用Python的open()函数打开文件,然后使用write()函数将天气信息写入文件中。
5、接下来,Web服务器需要使用讯飞API进行文本转语音(TTS)操作。可以通过向讯飞API发送HTTP POST请求,将文本内容发送给API,并收到返回的音频文件。
6、Web服务器将生成的音频文件存储在特定的位置,供ESP32访问。可以在服务器上设置一个特定的URL来提供音频文件的下载。
7、ESP32连接到Web服务器,通过HTTP请求下载音频文件。下载后的音频文件将保存在ESP32的本地存储中,准备进行音频播放。
8、为了实现音频播放,需要将ESP32与DAC芯片进行连接。根据硬件连接,设置相应的引脚作为I2S总线的时钟、字时钟和数据输出引脚。
9、ESP32将从本地存储中读取音频数据,并通过I2S协议将数据发送给DAC芯片。DAC芯片将数字信号转换为模拟信号,并将其输出到功放电路。
10、通过连接到喇叭或扬声器,功放电路将模拟音频信号转换为声音,从喇叭中播放出来。这样就实现了将天气信息以音频形式播放出来的功能。
总体的过程就是这么一个过程。
- 硬件介绍
硬件主要就是esp32的板卡,然后基于他的esp32s3,结合外部的DAC模块和音频功放,也就是ES8156和NS4150。然后还有一个喇叭,我这个功能比较简单,是一个20s自动获取一次天气信息,自动播报一次的功能,所以我没有添加按键的功能,也没有驱动屏幕,也就是st7789的功能。然后这个板卡的硬件还是很丰富的,比如两个麦克风,可以采集音频数据,然后还有外扩的IO口,比如一些串口等等。同时比较有意思的是这个ADC的按键功能,三个按键根据一个adc的电压输入从而去实现不同的按键功能,这个操作我确实是第一次见,虽然很可惜,这一次我没有去完成按键播报的这么一个功能,但是各位都可以试一试这个。
二、软件流程图及各功能对应的主要代码片段及说明
1、流程图(见图)
2、主要代码片段说明(先说一下,代码都有注释,而且我是各个直接的功能单独实现的,可以参考各个功能的文件)
(1)、连接WIFI
#WIFI配置(这里配置成自己的wifi连接)
# ssid = 'zhao'
# password ='66666666'
#WIFI连接函数
def ConnectNet(ssid ,password):
mynetwork=network.WLAN(network.STA_IF)#先配置
mynetwork.active(False)#先关闭WiFi
mynetwork.active(True)#再打开WiFi
mynetwork.connect(ssid,password)#上诉没有的话,会导致报错,也就是wifi无法识别和连接
while True:
if(mynetwork.isconnected()):
break
else :
time.sleep(1)
#print(mynetwork.ifconfig())
print("WIFI is connect")#串口打印文本监测
(2)、访问心知天气获取信息和解码json——这个打印出来的就是天气信息了
result1=urequests.get('https://api.seniverse.com/v3/weather/now.json?key=这里用自己的心知天气密钥,参考他们的官方温度&location=nanjing(这里是城市,比如你想要南京的就是nanjing,北京就是beijing)&language=zh-Hans&unit=c')#获取api的数据,即为天气的json
j1=ujson.loads(result1.text)#解析
# print(j1['results'][0]['location']['name'],end=' ')#城市名字
# print(j1['results'][0]['now']['text'],end=' ')#天气情况
# print(j1['results'][0]['now']['temperature'],end='℃ ')#温度
# print(j1['results'][0]['last_update'])#数据更新时间
city = j1['results'][0]['location']['name']
weather = j1['results'][0]['now']['text']
temp = j1['results'][0]['now']['temperature']
fresh_time = j1['results'][0]['last_update']
print(city)
print(weather)
print(temp,"度")
print(fresh_time)
(3)、向服务器发送文本和从服务器下载音频数据
def send_content_to_server(content):
url = "http://IP地址:端口号/receive_content" # 请替换为你电脑的IP和端口号
content_utf8 = content.encode('utf-8')#编码
headers = {'Content-Type': 'text/plain'}
response = urequests.post(url, data=content_utf8, headers=headers)
print("Content sent to server. Response status:", response.status_code)
response.close()
def download_wav_file(url, save_path):
response = urequests.get(url)
with open(save_path, 'wb') as file:
file.write(response.content)
# 服务器端存放WAV文件的URL
server_wav_url = 'http://跟前面那个一样/get_wav_file'
# ESP32端存放WAV文件的路径,请根据实际情况修改
esp32_wav_path = '/output.wav'
(4)、I2S的初始化和DAC的配置
sck_pin = Pin(17) # 串行时钟输出,I2S_SCLK
ws_pin = Pin(47) # 字时钟,I2S_LRCK
sd_pin = Pin(15) # 串行数据输出,GPIO15=I2S_DAC_SDIN
pa_pin = Pin(46, Pin.OUT) # 创建用于控制功率放大器的Pin对象
pa_pin.value(1) # 将引脚的值设置为1(高电平)以启用放大器
print("ADC IS OK")#不用管,删掉,这是我调试数据的,所有的串口打印都是为了调试看的
audio_out = I2S(0,sck=sck_Pin,
ws=ws_pin,
sd=sd_pin,
mode=I2S.TX,
bits=16,
format=1,
rate=16000,
ibuf=20000)
print("IIS IS OK")#不用管,删掉,这是我调试数据的,所有的串口打印都是为了调试看的
(5)、主函数的解析
result1=urequests.get('https://api.seniverse.com/v3/weather/now.json?key=密钥location=城市&language=zh-Hans&unit=c')#获取api的数据,即为天气的json
j1=ujson.loads(result1.text)#解析
# print(j1['results'][0]['location']['name'],end=' ')#城市名字
# print(j1['results'][0]['now']['text'],end=' ')#天气情况
# print(j1['results'][0]['now']['temperature'],end='℃ ')#温度
# print(j1['results'][0]['last_update'])#数据更新时间
city = j1['results'][0]['location']['name']
weather = j1['results'][0]['now']['text']
temp = j1['results'][0]['now']['temperature']
fresh_time = j1['results'][0]['last_update']
print(city)
print(weather)
print(temp,"度")
print(fresh_time)
# 获取要发送的内容,读取
content = city + " " + weather + " " + str(temp) + "度 "
print(content)
send_content_to_server(content)#向服务器的input写入这个文本
# 下载WAV文件并保存到ESP32
download_wav_file(server_wav_url, esp32_wav_path)
#uart_test
print("download is ok")#下载成功
wavtempfile = "output.wav"
wav = open(wavtempfile,'rb')
print('播放音频')
# 播放开始时间
start_time = time.ticks_us()
# 读取音频文件的二进制数据
buf = wav.read()
# wav文件的头部数据,不是实际的音频数据,是文件信息,所以我们要丢弃这部分数据
bufhead = 44
# 实际音频数据大小,等于总大小减去头部信息的大小
bufsize = len(buf) - bufhead
# 缓冲区大小,前面初始化的时候设置的最后一个参数
bufcap = 4096
# 下面的0.032得来的方法:16000(采样率) x 16(采样位宽,我用的是16位音频,单位bit) x1(通道数,单声道1,立体声n) ÷ 8(1字节=8bit) ÷ 1000000(秒换算成微秒)
# 音频总时长 us(微秒)
all_time = bufsize / 0.032
# 写入DAC的次数
bunum = 1
# 要写入的数据 开始位置
bufstart = bufhead
# 循环读取
while bufsize:
# 读取结束位置,等于读取次数*缓冲区大小
bufend = bufcap * bunum
# 当结束位置大于总数据长度的时候,结束位置等于数据最后一位
if bufend > len(buf):
bufend = len(buf)
# 要写入的数据
bufwrite = buf[bufstart:bufend]
# 写入数据
num_written = audio_out.write(bufwrite)
# 总大小减去每次写入的大小
bufsize -= len(bufwrite)
# 重新设置读取位置,为上次结束后一位
bufstart = bufend
# 读取次数
bunum = bunum + 1
# 等待音频播放完
while 1:
# 播放结束时间
end_time = time.ticks_us()
# 如果当前时间减去开始播放的时间大于音频时长
if (end_time - start_time) > all_time:
# 取消初始化 I2S 总线
audio_out.deinit()
# 停止等待
break
# 播放完毕
# 关闭文件
wav.close()
print("OVER")#音频播放完毕,进入下一个天气播报循环
time.sleep(20)
(6)、服务器内容和科大讯飞的TTS功能,这一个是要在pycharm上面运行的
# -*- coding:utf-8 -*-
#
# author: iflytek
#
# 本demo测试时运行的环境为:Windows + Python3.7
# 本demo测试成功运行时所安装的第三方库及其版本如下:
# cffi==1.12.3
# gevent==1.4.0
# greenlet==0.4.15
# pycparser==2.19
# six==1.12.0
# websocket==0.2.1
# websocket-client==0.56.0
# 合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式
# 错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
import websocket
import datetime
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import os
#pcm转wave
import wave
import struct
STATUS_FIRST_FRAME = 0 # 第一帧的标识
STATUS_CONTINUE_FRAME = 1 # 中间帧标识
STATUS_LAST_FRAME = 2 # 最后一帧的标识
class Ws_Param(object):
# 初始化
def __init__(self, APPID, APIKey, APISecret, Text):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.Text = Text
# 公共参数(common)
self.CommonArgs = {"app_id": self.APPID}
# 业务参数(business),更多个性化参数可在官网查看
self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "x3_xiaoyue","speed":30, "tte": "utf8"}
#聆小璇-温柔 x4_lingxiaoxuan_en,聆小瑶-情感 x4_lingxiaoyao_em,四川话x3_yezi_sc,粤语x3_xiaoyue。
self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
#使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"”
#self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")}
# 生成url
def create_url(self):
url = 'wss://tts-api.xfyun.cn/v2/tts'
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": "ws-api.xfyun.cn"
}
# 拼接鉴权参数,生成url
url = url + '?' + urlencode(v)
# print("date: ",date)
# print("v: ",v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
# print('websocket url :', url)
return url
def on_message(ws, message):
try:
message =json.loads(message)
code = message["code"]
sid = message["sid"]
audio = message["data"]["audio"]
audio = base64.b64decode(audio)
status = message["data"]["status"]
print(message)
if status == 2:
print("ws is closed")
ws.close()
if code != 0:
errMsg = message["message"]
print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
else:
with open('./demo.pcm', 'ab') as f:
f.write(audio)
except Exception as e:
print("receive msg,but parse exception:", e)
#读取文件
def openreadtxt(file_name):
data = []
with open(file_name, 'r') as file:
for row in file:
# 去掉换行符并合并为一个字符串
row_data = ' '.join(row.strip().split())
data.append(row_data)
return data
# 收到websocket错误的处理
def on_error(ws, error):
print("### error:", error)
# 收到websocket关闭的处理
def on_close(ws, close_status, close_msg):
print("### closed ###")
print("Close status:", close_status)
print("Close message:", close_msg)
# 收到websocket连接建立的处理
def on_open(ws):
def run(*args):
d = {"common": wsParam.CommonArgs,
"business": wsParam.BusinessArgs,
"data": wsParam.Data,
}
d = json.dumps(d)
print("------>开始发送文本数据")
ws.send(d)
if os.path.exists('./demo.pcm'):
os.remove('./demo.pcm')
thread.start_new_thread(run, ())
#转换wav
def pcm_to_wav(pcm_file, wav_file, channels=1, sample_width=2, frame_rate=16000):
with open(pcm_file, 'rb') as pcm_data:
pcm_data = pcm_data.read()
wav = wave.open(wav_file, 'wb')
wav.setnchannels(channels)
wav.setsampwidth(sample_width)
wav.setframerate(frame_rate)
wav.writeframes(pcm_data)
wav.close()
if __name__ == "__main__":
# 测试时候在此处正确填写相关信息即可运行
data = openreadtxt('input.txt')
text = ''.join(data) # 将列表中的元素连接成一个字符串
print(text)
print(data)
wsParam = Ws_Param(APPID=' ', APIKey=' ',
APISecret=' ',
Text=text)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
pcm_file_path = './demo.pcm' # 你的 PCM 文件路径
wav_file_path = './output.wav' # 保存为 WAV 文件的路径
pcm_to_wav(pcm_file_path, wav_file_path)
三、功能展示及说明(可右上角点“上传”插入图片进行展示并说明)
1、首先是wifi连接,本地串口的信息显示,可以看到显示wifi成功连接,然后也显示了天气信息,然后是服务器的数据发送返回,然后是下载音频,下载成功,播放音频,播放音频之后会完毕wav文件,然后打印OVER
2、接下来是服务器的访问
这里可以看出,服务器从esp32处获取了一次文件读写,然后又进行了一次音频下载
3、在电脑端口的TTS,讯飞进行的操作,可以看到成功写入了文本,并且音频数据也改变了
4、附带讯飞端口的TTS执行
5、具体的实物操作,因为是音频,而且我没用这个按键和显示屏幕,所以还是得看视频,见谅
四、对本活动的心得体会(包括意见或建议)
1、本次因为乐鑫的idf不是很好配置,使用c编程的时候,挺麻烦的,不过有广大群友和大佬们进行了帮助,所以这个问题不是很大。
2、个人原因:不知道为什么我配完IDF,然后配置vscode的时候,怎么样都不成功,反而是elicpse的那个编译器可以正常工作,但是命令行的操作也会报错,最后我也只是跑通了连接wifi,获取心知天气的天气信息,然后在乐鑫官方的TTS例程那里卡住了
3、因为临时卡住了,加上我使用过arduino来编程,然后学过python,就突发奇想想试一试micropyhon,结果这个确实要方便一些,但是对于很多细节的操作还是不够,可以的话还是建议使用乐鑫的idf和vscode配置去编程,命令行操作也很不错。
4、本次学习收获最大的还是web服务器,因为我的思路,导致了要在服务器上面进行操作,首先声明我不确定乐鑫的TTS是怎么实现的,但是理论上来讲,确实可以把讯飞在pycharm的移植到esp32的micropython里面,但是我没有去做,然后我也不确定行不行得通。
5、web服务器的知识还是很多的,我说实在没办法在文章中三言两语说清楚,而且我之前学习python是做课设,只会使用一些基本的数据处理,像这一次的服务器的知识我是现学的,同时这一次的技术点也在这,学会搭建了服务器之后,就可以实现esp32和pc端直接的互相操作,进而完成一个完整的项目。
6、不建议大家学习micropyhon去编程esp32,虽然乐鑫环境确实不太友好,但是那个是从原理出发的,这个micropython很多东西是做不到像c那样的,还是建议学习乐鑫的官方环境
叠甲:上述全是个人主观感受,不代表我吹什么什么好,也不是我说什么什么不行。每个人感觉不一样,同时这一次活动有很多大佬,他们所做的项目都比我要高级和精细,我只是一个刚开始使用乐鑫idf的一个新手,很多东西都做的不够好,还有很多要学习的地方,希望各位体谅。
最后:我所操作的调试文件全部在后续的文件附录里面,代码一般都有注释。
附带:百度网盘下载文件:链接:https://pan.baidu.com/s/1tZJ0ds8Kctv1yGf9KHf5OA?pwd=6666
提取码:6666