Funpack2-5, 使用ESP32-S3 BOX Lite 完成任务一,利用tts及联网功能实现语音播报天气钟
Funpack2-5活动, 使用ESP32-S3 BOX Lite 完成任务一,利用tts及联网功能实现语音播报天气钟。
标签
嵌入式系统
Funpack活动
显示
开发板
小熊熊
更新2023-08-01
1622

项目介绍

本项目利用了ESP32-S3的WiFi和TTS功能,实现一个语音播报系统,并做一个简单的天气钟,来实现时间,天气的播报。

设计思路

由于ESP32-S3支持多平台开发,考虑到会有大量同期的同学使用官方开发框架,因此本次使用的是micropython进行开发。感谢各路开源作者,项目中用到的开源项目有:micropython,pyclock,edge_tts,mpy_st7789,flask。

项目阶段性目标分为两块:

1,实现网络连接,驱动屏幕,完成天气钟。

2,驱动dac,利用tts api完成语音播报。

硬件介绍

BOX%20%E8%A7%86%E9%A2%91%E5%B0%81%E9%9D%A2%20%E6%8B%B7%E8%B4%9D-01.jpg

ESP32-S3-BOX 搭载 ESP32-S3 AI SoC,在芯片内置的 512 KB SRAM 之外,还集成了 16 MB QSPI flash 和 8 MB Octal PSRAM。它板载一块配备电容触摸面板的 2.4 寸显示屏(分辨率 320 x 240),双麦克风,一个扬声器和两个用于硬件拓展的 Pmod™ 兼容接口;采用 Type-C USB 连接器,提供 5 V 电源输入和串口/JTAG 调试接口。

软件流程

软件整体功能分为以下几块:

FmdK5CqWPr2RLjOD13sjfQ7OznTb

首先是wifi配置,先寻找是否存在已有的配置文件,如果有就直接联网,没有的话就启用ap配网:

# 没有WiFi配置文件,出厂模式
while "wifi.txt" not in os.listdir():
    ap.startAP()  # 启动AP配网模式

# 连接WiFi
while not WIFI_Connect():  # 等待wifi连接
    pass

由于ap模块需要一些html代码来搭建简单的网页,因此放在了单独的ap.py文件中,需要在主程序开头进行导入。而ap也同样利用到了屏幕,将一些配置所需的引导信息直接存为图片,并显示在屏幕上,只需要跟随屏幕指示一步一步做就可以了。这里主要配置的信息有两个,一个是wifi连接信息,第二个是城市信息。配置好后会在目录里生成对应txt文件;如果需要重新配置,可以运行中长按中键,或者手动删除生成的txt文件即可。

Fm5gldPLoQK-vuB_qpQTdmxM7t7UFtsgXzW40p7hNnhPZj2LWy9Fv6F_

初始化配置的代码如下,功能是删除配置文件并重启:

print("Factory Mode!")
try:
    os.remove("wifi.txt")
    print("Remove wifi.txt")
except:
    print("no wifi.txt")
reset()  # 重启开发板。

 

接下来金入下一个阶段,实现天气钟。天气钟有两块,一块是天气内容获取,我是使用爬虫直接在网页上获取的,这样不需要去注册api,大家下载就能用。当然使用api的话代码会更加简单。以下是爬虫部分:

# 网页获取天气数据
def weather_get(datetime):
    global weather, lost, total, city

    for i in range(5):  # 失败会重试,最多5次
        try:
            _url = "https://www.tianqi.com/" + city[1] + "/"
            myURL = urequest.urlopen(_url)
            text = myURL.read(15000 + 1000 * i).decode("utf-8")  # 抓取约前4W个字符,节省内存。
            text1=re.search('<p class="now">' + "(.*?)" + "<h6>", text).group(1)
            weather[0] = re.search("<span><b>" + "(.*?)" + "</b>", text1).group(1)  # 当日天气
            weather[1] = re.search(weather[0] +"</b>" + "(.*?)" + " ~ ", text1).group(1)  # 当天最低温
            weather[2] = re.search(" ~ " + "(.*?)" + "\u2103", text1).group(1)  # 当天最高温
            weather[3] = weather[0]  # 实时天气
            weather[4] = re.search("\u7a7a\u6c14\u8d28\u91cf\uff1a" + "(.*?)" + "</h5>", text1).group(1)  # 空气质量
            weather[5] = re.search("\u98ce\u5411\uff1a" + "(.*?)" + " ", text1).group(1)  # 实时风向
            weather[6] = re.search(weather[5] + " " + "(.*?)" + "</b><b>", text1).group(1)  # 实时风力级数
            weather[7] = re.search("<b>" + "(.*?)" + "</b><i>", text1).group(1)  # 温度
            weather[8] = re.search("\u6e7f\u5ea6\uff1a" + "(.*?)" + "%</b><b>", text1).group(1)  # 相对湿度

            total = total + 1

            return None

        except:
            print("Can not get weather!", i)
            lost = lost + 1
            gc.collect()  # 内存回收

        time.sleep_ms(1000)

网站可能会对爬虫做一系列的限制。如果大家发现无法获取到数据了,那可能是网站做了限制,或者是修改了html结构。大家只需要按照上面的样式修改搜索方法即可。

时间的获取就比较简单,可以用自带的ntp方法,只需要修改下服务器即可。需要注意的是网上使用delta方法直接修改时区的做法我实际测试并没有任何作用,因此在这里我是手动修改的时区:

ntptime.host = "ntp1.aliyun.com"

    
# 获取网络时间
def ntp_get():
    for i in range(10):  # 最多尝试获取10次
        try:
            ntptime.settime()  # 获取网络时间
            _t=rtc.datetime()
            rtc.datetime((_t[0],_t[1],_t[2],_t[3],_t[4]+8,_t[5],_t[6],_t[7]))
            print("ntp time(BeiJing): ", rtc.datetime())
            return True

        except:
            print("Can not get time!")

        time.sleep_ms(500)

 

到此我们已经得到了所有需要的数据。接下来需要驱动屏幕并显示数据。

屏幕驱动这一块对于mpy来说稍微有点麻烦,因为官方驱动里并不包含这一块,需要自己把驱动编译进固件。我已经编译好了包含st7789的 micropython 1.20固件,一并放在了项目中。大家需要刷这个固件,才能正常使用屏幕。当然这个驱动驱动其他的一些屏幕也是可以的,但是需要手动设置初始化寄存器,具体使用方法可以参考mpy_7789开源项目。

由于ap和主程序,还有ui页面都需要使用到屏幕,而且很多场景都需要使用到中文显示,因此屏幕初始化与object创建,以及中文显示方法也单独写了一个库,tft_config.py,方便调用:

from machine import Pin, SPI
import st7789

def config(rotation=3, buffer_size=0, options=0):
    _bl = Pin(45,mode=Pin.OUT,value=0)
    _tft = st7789.ST7789(
        spi = SPI(1, baudrate=31250000, sck=Pin(7), mosi=Pin(6)),
        width = 240,
        height = 320,
        dc=Pin(4, Pin.OUT),
        reset = Pin(48, Pin.OUT),
        cs=Pin(5, Pin.OUT),
        rotation=rotation,
        color_order = st7789.RGB,
        options=options,
        buffer_size=buffer_size
        )
    _tft.init()
    return _tft, _bl

#中文显示
def printChinese(tft,fonts,text,x,y,color=st7789.BLACK,backcolor=st7789.WHITE,size=1):
    
    font_size = [0,16,24,32,40,48] #分别对应size=1,2,3,4,5的字体尺寸,0无效。
    
    chinese_dict = {}
    
    #获取对应的字模
    if size==1:
        chinese_dict = fonts.hanzi_16x16_dict
        
    elif size==2:
        chinese_dict = fonts.hanzi_24x24_dict
        
    elif size==3:
        chinese_dict = fonts.hanzi_32x32_dict
        
    elif size==4:
        chinese_dict = fonts.hanzi_40x40_dict
        
    elif size==5:
        chinese_dict = fonts.hanzi_48x48_dict    
    
    xs = x
    ys = y
    
    #定义字体颜色,RGB888转RGB565
    # fc = ((color[0]>>3)<<11) + ((color[1]>>2)<<5) + (color[2]>>3)  # 字体
    # bc = ((backcolor[0]>>3)<<11) + ((backcolor[1]>>2)<<5) + (backcolor[2]>>3)  # 字体背景颜色
    fc = color
    bc = backcolor

    for i in range(0, len(text)):
        
        ch_buf =chinese_dict[text[i]] #汉子对应码表
        
        rgb_buf = []
        
        t1 = font_size[size] // 8
        t2 = font_size[size] % 8

        for i in range(0, len(ch_buf)):
            
            for j in range(0, 8):
                if (ch_buf[i] << j) & 0x80 == 0x00:
                    rgb_buf.append(bc >> 8)
                    rgb_buf.append(bc & 0xff)
                else:
                    rgb_buf.append(fc >> 8)
                    rgb_buf.append(fc & 0xff)
                    

        tft.blit_buffer(bytearray(rgb_buf),xs,y,font_size[size],font_size[size])
        
        xs += font_size[size]

主要的屏幕ui显示都写在了default.py中,使用这种方法可以设置多种不同的显示页面,并且在主程序中进行切换。在本项目中仅有天气钟一个功能,因此仅显示default.py的ui。

至此天气钟部分完成,接下来讲讲tts部分:

首先是硬件驱动。本项目仅使用一个按键,就是中间的按键,按下触发tts,长按触发重置。adc按键的读取需要靠主程序在每一次循环中查询adc的读数来判断:

# 按键
KEY = ADC(Pin(1))
KEY.atten(ADC.ATTN_11DB)

# 按键触发
def key(KEY):
    if (KEY.read() > (2395 - 250)) and (KEY.read() < (2395 + 250)):
        time.sleep_ms(10)  # 消除抖动
        if (KEY.read() > (2395 - 250)) and (KEY.read() < (2395 + 250)):# 确认按键被按下
            # 出厂模式
            start = time.ticks_ms()
            while (KEY.read() > (2395 - 250)) and (KEY.read() < (2395 + 250)):
                if time.ticks_ms() - start > 5000:  # 长按按键5秒
                    WIFI_LED.value(1)  # 指示灯亮
                    print("Factory Mode!")
                    try:
                        os.remove("wifi.txt")
                        print("Remove wifi.txt")
                    except:
                        print("no wifi.txt")

                    reset()  # 重启开发板。
            # 不是长按
            try:
                tts()
            except:
                print("connecting tts server failed")

这里消除抖动用了10ms的延迟,长按的阈值设置为5秒。按下后开始计时,如果在5秒内检测到按键弹起,那么就触发tts,否则运行重置程序。

最后一块是tts部分的实现,首先是服务器端的代码,这里用开源的flask搭建了一个简易http服务器,使用post方法将需要转为语音的文本发送给服务器,服务器返回wav数据流:

from flask import Flask, request
import json
import asyncio
import edge_tts
import pydub
import io

async def tts(text):
    _voices = await edge_tts.VoicesManager.create()
    _voices = _voices.find(ShortName="zh-CN-XiaoyiNeural")
    _communicate = edge_tts.Communicate(text, _voices[0]["Name"])
    _out = bytes()
    async for _chunk in _communicate.stream():
        if _chunk["type"] == "audio":
            _out += _chunk["data"]
        elif _chunk["type"] == "WordBoundary":
            pass
    _raw = pydub.AudioSegment.from_file(io.BytesIO(_out))
    _wav = io.BytesIO()
    _raw.export(_wav, format="wav")
    return _wav.getvalue()

app = Flask(__name__)
@app.route("/", methods=["POST"])
def handle_tts():
    _data = json.loads(request.get_data().decode("utf-8"))
    _text = _data["text"]
    print(_text)
    _wavdata = asyncio.run(tts(_text))
    return _wavdata

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080, debug=True)

这里安装pydub时还需要同步安装解码器并设置环境变量,具体步骤如下:

  1. 去FFmpeg官网找到对应的系统版本下载。
  2. 将压缩包解压到指定的目录。
  3. 将安装安装目录下的bin文件夹添加到系统的Path环境变量中。

接下来再回到micropython,首先初始化dac设备,由于es8156是需要主时钟信号的,而且在原理图中也连接了主时钟信号,但micropython的i2s驱动中却不使用主时钟。因此需要先初始化一个i2s设备,产生mclk时钟后,再去初始化我们真正使用的设备,同时不要忘记打开功放使能:

sck_pin = Pin(17) 
ws_pin = Pin(47)
sdo_pin = Pin(15)
sdi_pin = Pin(16)

scl_pin = Pin(18)
sda_pin = Pin(8)

pa_pin = Pin(46, Pin.OUT)
pa_pin.value(1)
audio_out = I2S(0,
                sck=Pin(2), ws=ws_pin, sd=sdo_pin,
                mode=I2S.TX,
                bits=16,
                format=1,
                rate=24000,
                ibuf=81920)
audio_out.deinit()
wp = WavPlayer(
    id=0,
    sck_pin=sck_pin,
    ws_pin=ws_pin,
    sd_pin=sdo_pin,
    ibuf=20000
)

最后,使用post方法从我们自己搭建的api服务器中获得音频数据,再放入wp播放就可以了。注意由于发送的数据中含有中文字符,所以并不能直接使用json方法,否则会导致数据发送不完整;在这里要先编码后再发送,然后由服务端解码:

def tts():
    while wp.isplaying() == True:
        return
    global audio_out,datetime
    if datetime[5]<10:
        minuate = "0" + str(datetime[5])
    else:
        minuate = str(datetime[5])
    _text = "现在时间:" + str(datetime[4]) +"点"+minuate+"分。"+city[0]+"天气:" + weather[3]+"。温度:"+ weather[7]+"摄氏度。湿度:百分之"+ weather[8] + "。空气质量:"+ weather[4]
    _data = {"text":_text}
    print(_data)
    _url = "http://192.168.50.100:8080/"
    _myURL = urequests.post(_url, data=json.dumps(_data).encode("utf-8"))
    _raw = _myURL.content
    print("wav length",len(_raw))
    wav = io.BytesIO(_raw)
    wp.play(wav, loop=False)

至此,所有功能完成。

 

功能展示

上电后按照屏幕指示配网,完成后机器会自动重启并开始正常工作,工作界面如下:

Fn7wKt5s1sIjpTz_aYxOKIV2JD6A

按中键可实现语音播报,长按中键初始化设置。

 

具体演示参考视频后半演示部分。

 

心得体会

不得不说,使用python方法来开发嵌入式系统完全是一种全新的体验,单从开发角度来讲舒适度吊打传统的C语言,这也是这次我选用micropython开发的原因,也是想尝试一下这种全新的开发方式。但不可忽视的是嵌入式python系统目前依旧非常不完善,bug比较多,容易死机,而且在很多库和方法上和python并不完全兼容(尤其比较坑爹的是datetime格式居然不一样,太异类了)。可以看到随着单片机性能的提升,脚本式语言的全面发展已经势不可挡,但还需要一些时间和耐心。

附件下载
Funpack2-5_xxxmj.rar
团队介绍
小熊熊的小马甲
团队成员
小熊熊
评论
0 / 100
查看更多
目录
硬禾服务号
关注最新动态
0512-67862536
info@eetree.cn
江苏省苏州市苏州工业园区新平街388号腾飞创新园A2幢815室
苏州硬禾信息科技有限公司
Copyright © 2024 苏州硬禾信息科技有限公司 All Rights Reserved 苏ICP备19040198号