Funpack2-5的基于micropython的天气语音播报系统——TTS语音合成技术
基于esp32boxlite完成任务一,使用micropython编程,且搭建了一个小的文件操作服务器,同时使用了心知天气和讯飞的TTS合成这两个端口,从而实现一个简单的天气语音播报系统。
标签
嵌入式系统
Funpack活动
ESP32-S3-BOX-LITE
TTS
WEB
ZHAO
更新2023-08-02
南京邮电大学
1241

一、项目描述

  • 项目介绍

本次参加的是funpack第二季第五期的活动,所实现的是任务一,一个基于基于esp32boxlite的,使用micropython编程的天气信息语音播报系统。同时搭建了一个小的文件操作服务器,同时使用了心知天气和讯飞的TTS合成这两个端口,从而实现一个简单的天气语音播报系统。

  • 设计思路

本次设计思路基本上是一个硬件结合一个软件部分,具体的一个详细过程如下:

1、首先,我们需要让ESP32连接到Wi-Fi网络。使用ESP32的Wi-Fi联网,可以在代码中设置Wi-Fi连接所需的SSID和密码,然后调用连接函数进行连接。连接成功后,ESP32就可以通过互联网访问其他网络资源。

2、连接成功后,我们可以使用ESP32访问http的功能,向心知天气的API发送HTTP请求,获取天气信息。心知天气的API会返回一个JSON格式的数据,其中包含天气信息如温度、天气状况等。

3、获取到天气信息后,我们需要将这些信息发送给Web服务器。可以使用ESP32的HTTP,将天气信息打包成HTTP POST请求,并发送到Web服务器的特定端点。

4、Web服务器接收到天气信息后,可以使用Python的文件操作,将天气信息写入input.txt文件。可以使用Python的open()函数打开文件,然后使用write()函数将天气信息写入文件中。

5、接下来,Web服务器需要使用讯飞API进行文本转语音(TTS)操作。可以通过向讯飞API发送HTTP POST请求,将文本内容发送给API,并收到返回的音频文件。

6、Web服务器将生成的音频文件存储在特定的位置,供ESP32访问。可以在服务器上设置一个特定的URL来提供音频文件的下载。

7、ESP32连接到Web服务器,通过HTTP请求下载音频文件。下载后的音频文件将保存在ESP32的本地存储中,准备进行音频播放。

8、为了实现音频播放,需要将ESP32与DAC芯片进行连接。根据硬件连接,设置相应的引脚作为I2S总线的时钟、字时钟和数据输出引脚。

9、ESP32将从本地存储中读取音频数据,并通过I2S协议将数据发送给DAC芯片。DAC芯片将数字信号转换为模拟信号,并将其输出到功放电路。

10、通过连接到喇叭或扬声器,功放电路将模拟音频信号转换为声音,从喇叭中播放出来。这样就实现了将天气信息以音频形式播放出来的功能。

总体的过程就是这么一个过程。

  • 硬件介绍

FpiZI77mquTJRTIQ2J3-OKcrjyIE

硬件主要就是esp32的板卡,然后基于他的esp32s3,结合外部的DAC模块和音频功放,也就是ES8156和NS4150。然后还有一个喇叭,我这个功能比较简单,是一个20s自动获取一次天气信息,自动播报一次的功能,所以我没有添加按键的功能,也没有驱动屏幕,也就是st7789的功能。然后这个板卡的硬件还是很丰富的,比如两个麦克风,可以采集音频数据,然后还有外扩的IO口,比如一些串口等等。同时比较有意思的是这个ADC的按键功能,三个按键根据一个adc的电压输入从而去实现不同的按键功能,这个操作我确实是第一次见,虽然很可惜,这一次我没有去完成按键播报的这么一个功能,但是各位都可以试一试这个。

FnNbj7sDtocEaWruer3MZd5I_juzFiiv5RtRHNIvlft3sC4_O44OMBrE


二、软件流程图及各功能对应的主要代码片段及说明

1、流程图(见图)

Fgot1LSFZNsMt5pgBfYRYTWppLEV

2、主要代码片段说明(先说一下,代码都有注释,而且我是各个直接的功能单独实现的,可以参考各个功能的文件)

(1)、连接WIFI

#WIFI配置(这里配置成自己的wifi连接)
# ssid = 'zhao'
# password ='66666666'

#WIFI连接函数
def ConnectNet(ssid ,password):
    mynetwork=network.WLAN(network.STA_IF)#先配置
    mynetwork.active(False)#先关闭WiFi
    mynetwork.active(True)#再打开WiFi
    mynetwork.connect(ssid,password)#上诉没有的话,会导致报错,也就是wifi无法识别和连接
 
    while True:
        if(mynetwork.isconnected()):
            break
        else :
            time.sleep(1)
    #print(mynetwork.ifconfig())
    print("WIFI is connect")#串口打印文本监测

(2)、访问心知天气获取信息和解码json——这个打印出来的就是天气信息了

result1=urequests.get('https://api.seniverse.com/v3/weather/now.json?key=这里用自己的心知天气密钥,参考他们的官方温度&location=nanjing(这里是城市,比如你想要南京的就是nanjing,北京就是beijing)&language=zh-Hans&unit=c')#获取api的数据,即为天气的json
        j1=ujson.loads(result1.text)#解析
#         print(j1['results'][0]['location']['name'],end=' ')#城市名字
#         print(j1['results'][0]['now']['text'],end=' ')#天气情况
#         print(j1['results'][0]['now']['temperature'],end='℃ ')#温度
#         print(j1['results'][0]['last_update'])#数据更新时间
        city = j1['results'][0]['location']['name']
        weather = j1['results'][0]['now']['text']
        temp = j1['results'][0]['now']['temperature']
        fresh_time = j1['results'][0]['last_update']
        print(city)
        print(weather)
        print(temp,"度")
        print(fresh_time)

(3)、向服务器发送文本和从服务器下载音频数据

def send_content_to_server(content):
    url = "http://IP地址:端口号/receive_content"  # 请替换为你电脑的IP和端口号
    content_utf8 = content.encode('utf-8')#编码
    headers = {'Content-Type': 'text/plain'}
    response = urequests.post(url, data=content_utf8, headers=headers)
    print("Content sent to server. Response status:", response.status_code)
    response.close()
def download_wav_file(url, save_path):
    response = urequests.get(url)
    with open(save_path, 'wb') as file:
        file.write(response.content)
# 服务器端存放WAV文件的URL
server_wav_url = 'http://跟前面那个一样/get_wav_file'
# ESP32端存放WAV文件的路径,请根据实际情况修改
esp32_wav_path = '/output.wav'

(4)、I2S的初始化和DAC的配置

sck_pin = Pin(17)  # 串行时钟输出,I2S_SCLK
ws_pin = Pin(47)   # 字时钟,I2S_LRCK
sd_pin = Pin(15)   # 串行数据输出,GPIO15=I2S_DAC_SDIN

pa_pin = Pin(46, Pin.OUT)  # 创建用于控制功率放大器的Pin对象
pa_pin.value(1)  # 将引脚的值设置为1(高电平)以启用放大器
print("ADC IS OK")#不用管,删掉,这是我调试数据的,所有的串口打印都是为了调试看的
audio_out = I2S(0,sck=sck_Pin,
            ws=ws_pin,
            sd=sd_pin,
            mode=I2S.TX,
            bits=16,
            format=1,
            rate=16000,
            ibuf=20000)
print("IIS IS OK")#不用管,删掉,这是我调试数据的,所有的串口打印都是为了调试看的

(5)、主函数的解析

result1=urequests.get('https://api.seniverse.com/v3/weather/now.json?key=密钥location=城市&language=zh-Hans&unit=c')#获取api的数据,即为天气的json
        j1=ujson.loads(result1.text)#解析
#         print(j1['results'][0]['location']['name'],end=' ')#城市名字
#         print(j1['results'][0]['now']['text'],end=' ')#天气情况
#         print(j1['results'][0]['now']['temperature'],end='℃ ')#温度
#         print(j1['results'][0]['last_update'])#数据更新时间
        city = j1['results'][0]['location']['name']
        weather = j1['results'][0]['now']['text']
        temp = j1['results'][0]['now']['temperature']
        fresh_time = j1['results'][0]['last_update']
        print(city)
        print(weather)
        print(temp,"度")
        print(fresh_time)

        # 获取要发送的内容,读取
        content = city + " " + weather + " " + str(temp) + "度 "
        print(content)
        send_content_to_server(content)#向服务器的input写入这个文本

        # 下载WAV文件并保存到ESP32
        download_wav_file(server_wav_url, esp32_wav_path)
        #uart_test
        print("download is ok")#下载成功
        wavtempfile = "output.wav"
        wav = open(wavtempfile,'rb')
        print('播放音频')
        # 播放开始时间
        start_time = time.ticks_us()
        # 读取音频文件的二进制数据
        buf = wav.read()
        # wav文件的头部数据,不是实际的音频数据,是文件信息,所以我们要丢弃这部分数据
        bufhead = 44
        # 实际音频数据大小,等于总大小减去头部信息的大小
        bufsize = len(buf) - bufhead
        # 缓冲区大小,前面初始化的时候设置的最后一个参数
        bufcap = 4096
        # 下面的0.032得来的方法:16000(采样率) x 16(采样位宽,我用的是16位音频,单位bit) x1(通道数,单声道1,立体声n) ÷ 8(1字节=8bit) ÷ 1000000(秒换算成微秒)
        # 音频总时长 us(微秒)
        all_time = bufsize / 0.032
        # 写入DAC的次数
        bunum = 1
        # 要写入的数据 开始位置
        bufstart = bufhead
        # 循环读取
        while bufsize:
            # 读取结束位置,等于读取次数*缓冲区大小
            bufend = bufcap * bunum
            # 当结束位置大于总数据长度的时候,结束位置等于数据最后一位
            if bufend > len(buf):
                bufend = len(buf)
            # 要写入的数据
            bufwrite = buf[bufstart:bufend]
            # 写入数据
            num_written = audio_out.write(bufwrite)
            # 总大小减去每次写入的大小
            bufsize -= len(bufwrite)
            # 重新设置读取位置,为上次结束后一位
            bufstart = bufend
            # 读取次数
            bunum = bunum + 1
            # 等待音频播放完
        while 1:
            # 播放结束时间
            end_time = time.ticks_us()
            # 如果当前时间减去开始播放的时间大于音频时长
            if (end_time - start_time) > all_time:
                # 取消初始化 I2S 总线
                audio_out.deinit()
                # 停止等待
                break
            # 播放完毕
            # 关闭文件
        wav.close()
        print("OVER")#音频播放完毕,进入下一个天气播报循环
        time.sleep(20)

(6)、服务器内容和科大讯飞的TTS功能,这一个是要在pycharm上面运行的

# -*- coding:utf-8 -*-
#
#   author: iflytek
#
#  本demo测试时运行的环境为:Windows + Python3.7
#  本demo测试成功运行时所安装的第三方库及其版本如下:
#   cffi==1.12.3
#   gevent==1.4.0
#   greenlet==0.4.15
#   pycparser==2.19
#   six==1.12.0
#   websocket==0.2.1
#   websocket-client==0.56.0
#   合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式
#  错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
import websocket
import datetime
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import os
#pcm转wave
import wave
import struct
STATUS_FIRST_FRAME = 0  # 第一帧的标识
STATUS_CONTINUE_FRAME = 1  # 中间帧标识
STATUS_LAST_FRAME = 2  # 最后一帧的标识


class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Text):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.Text = Text

        # 公共参数(common)
        self.CommonArgs = {"app_id": self.APPID}
        # 业务参数(business),更多个性化参数可在官网查看
        self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "x3_xiaoyue","speed":30, "tte": "utf8"}
        #聆小璇-温柔 x4_lingxiaoxuan_en,聆小瑶-情感 x4_lingxiaoyao_em,四川话x3_yezi_sc,粤语x3_xiaoyue。
        self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
        #使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"”
        #self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")}

    # 生成url
    def create_url(self):
        url = 'wss://tts-api.xfyun.cn/v2/tts'
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
        signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
            self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": "ws-api.xfyun.cn"
        }
        # 拼接鉴权参数,生成url
        url = url + '?' + urlencode(v)
        # print("date: ",date)
        # print("v: ",v)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
        # print('websocket url :', url)
        return url

def on_message(ws, message):
    try:
        message =json.loads(message)
        code = message["code"]
        sid = message["sid"]
        audio = message["data"]["audio"]
        audio = base64.b64decode(audio)
        status = message["data"]["status"]
        print(message)
        if status == 2:
            print("ws is closed")
            ws.close()
        if code != 0:
            errMsg = message["message"]
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
        else:

            with open('./demo.pcm', 'ab') as f:
                f.write(audio)

    except Exception as e:
        print("receive msg,but parse exception:", e)


 #读取文件
def openreadtxt(file_name):
    data = []
    with open(file_name, 'r') as file:
        for row in file:
            # 去掉换行符并合并为一个字符串
            row_data = ' '.join(row.strip().split())
            data.append(row_data)
    return data


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws, close_status, close_msg):
    print("### closed ###")
    print("Close status:", close_status)
    print("Close message:", close_msg)


# 收到websocket连接建立的处理
def on_open(ws):
    def run(*args):
        d = {"common": wsParam.CommonArgs,
             "business": wsParam.BusinessArgs,
             "data": wsParam.Data,
             }
        d = json.dumps(d)
        print("------>开始发送文本数据")
        ws.send(d)
        if os.path.exists('./demo.pcm'):
            os.remove('./demo.pcm')

    thread.start_new_thread(run, ())
#转换wav
def pcm_to_wav(pcm_file, wav_file, channels=1, sample_width=2, frame_rate=16000):
    with open(pcm_file, 'rb') as pcm_data:
        pcm_data = pcm_data.read()

    wav = wave.open(wav_file, 'wb')
    wav.setnchannels(channels)
    wav.setsampwidth(sample_width)
    wav.setframerate(frame_rate)

    wav.writeframes(pcm_data)
    wav.close()

if __name__ == "__main__":
    # 测试时候在此处正确填写相关信息即可运行
    data = openreadtxt('input.txt')
    text = ''.join(data)  # 将列表中的元素连接成一个字符串
    print(text)
    print(data)
    wsParam = Ws_Param(APPID=' ', APIKey=' ',
                       APISecret=' ',
                       Text=text)
    websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close)
    ws.on_open = on_open
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
    pcm_file_path = './demo.pcm'  # 你的 PCM 文件路径
    wav_file_path = './output.wav'  # 保存为 WAV 文件的路径
    pcm_to_wav(pcm_file_path, wav_file_path)


三、功能展示及说明(可右上角点“上传”插入图片进行展示并说明)

1、首先是wifi连接,本地串口的信息显示,可以看到显示wifi成功连接,然后也显示了天气信息,然后是服务器的数据发送返回,然后是下载音频,下载成功,播放音频,播放音频之后会完毕wav文件,然后打印OVER

FjNacDzXJmoQQHuRpxZ92GQliSRQ

2、接下来是服务器的访问

这里可以看出,服务器从esp32处获取了一次文件读写,然后又进行了一次音频下载Fm-SwUZDaVwEG43-lASpZX6g6Pi8

3、在电脑端口的TTS,讯飞进行的操作,可以看到成功写入了文本,并且音频数据也改变了

Fmcqx5RNBzpNeBaC2KB6TLeaMaTp

4、附带讯飞端口的TTS执行

FqRIFox4Ln4nLvGuiF-qpeLi-d1Y

5、具体的实物操作,因为是音频,而且我没用这个按键和显示屏幕,所以还是得看视频,见谅


四、对本活动的心得体会(包括意见或建议)

1、本次因为乐鑫的idf不是很好配置,使用c编程的时候,挺麻烦的,不过有广大群友和大佬们进行了帮助,所以这个问题不是很大。

2、个人原因:不知道为什么我配完IDF,然后配置vscode的时候,怎么样都不成功,反而是elicpse的那个编译器可以正常工作,但是命令行的操作也会报错,最后我也只是跑通了连接wifi,获取心知天气的天气信息,然后在乐鑫官方的TTS例程那里卡住了

3、因为临时卡住了,加上我使用过arduino来编程,然后学过python,就突发奇想想试一试micropyhon,结果这个确实要方便一些,但是对于很多细节的操作还是不够,可以的话还是建议使用乐鑫的idf和vscode配置去编程,命令行操作也很不错。

4、本次学习收获最大的还是web服务器,因为我的思路,导致了要在服务器上面进行操作,首先声明我不确定乐鑫的TTS是怎么实现的,但是理论上来讲,确实可以把讯飞在pycharm的移植到esp32的micropython里面,但是我没有去做,然后我也不确定行不行得通。

5、web服务器的知识还是很多的,我说实在没办法在文章中三言两语说清楚,而且我之前学习python是做课设,只会使用一些基本的数据处理,像这一次的服务器的知识我是现学的,同时这一次的技术点也在这,学会搭建了服务器之后,就可以实现esp32和pc端直接的互相操作,进而完成一个完整的项目。

6、不建议大家学习micropyhon去编程esp32,虽然乐鑫环境确实不太友好,但是那个是从原理出发的,这个micropython很多东西是做不到像c那样的,还是建议学习乐鑫的官方环境

叠甲:上述全是个人主观感受,不代表我吹什么什么好,也不是我说什么什么不行。每个人感觉不一样,同时这一次活动有很多大佬,他们所做的项目都比我要高级和精细,我只是一个刚开始使用乐鑫idf的一个新手,很多东西都做的不够好,还有很多要学习的地方,希望各位体谅。

 

最后:我所操作的调试文件全部在后续的文件附录里面,代码一般都有注释。

附带:百度网盘下载文件:链接:https://pan.baidu.com/s/1tZJ0ds8Kctv1yGf9KHf5OA?pwd=6666 
提取码:6666

附件下载
完整文件太大了,从百度网盘下载吧,code只是一部分代码.txt
链接:https://pan.baidu.com/s/1tZJ0ds8Kctv1yGf9KHf5OA?pwd=6666 提取码:6666
code.zip
链接:https://pan.baidu.com/s/1tZJ0ds8Kctv1yGf9KHf5OA?pwd=6666 提取码:6666
团队介绍
逸!误!
团队成员
ZHAO
评论
0 / 100
查看更多
目录
硬禾服务号
关注最新动态
0512-67862536
info@eetree.cn
江苏省苏州市苏州工业园区新平街388号腾飞创新园A2幢815室
苏州硬禾信息科技有限公司
Copyright © 2024 苏州硬禾信息科技有限公司 All Rights Reserved 苏ICP备19040198号