SIM7600 USB Audio实现语音通话
winddows 实现导出音频流到电脑网页播放。
·
树莓派5测试

树莓派接USB声卡. 手机给 SIM7600 拨打电话
import time
import serial
import pyaudio
import numpy as np
import sys
# ====== 串口配置 ======
AT_PORT = "/dev/ttyUSB2" # 根据树莓派5的串口调整
AUDIO_PORT = "/dev/ttyUSB4" # 根据树莓派5的串口调整
# ====== 音频参数 ======
SIM_RATE = 8000 # 电话音质
CHUNK_8K = 320 # 40ms的帧
MIC_RATE = 44100 # 麦克风默认采样率
class AudioDeviceFinder:
"""音频设备自动查找器"""
def __init__(self):
self.pa = pyaudio.PyAudio()
self.devices = []
self.scan_devices()
def scan_devices(self):
"""扫描所有音频设备"""
print("=" * 60)
print("扫描音频设备...")
print("=" * 60)
for i in range(self.pa.get_device_count()):
info = self.pa.get_device_info_by_index(i)
device = {
'index': i,
'name': info['name'],
'input_channels': int(info['maxInputChannels']),
'output_channels': int(info['maxOutputChannels']),
'default_sample_rate': int(info['defaultSampleRate'])
}
self.devices.append(device)
# 显示设备信息
input_str = f"Input:{device['input_channels']}" if device['input_channels'] > 0 else "Input:0"
output_str = f"Output:{device['output_channels']}" if device['output_channels'] > 0 else "Output:0"
print(f"{i:2d}: {device['name'][:40]:<40} {input_str:<10} {output_str:<10}")
def find_usb_audio_devices(self):
"""查找USB Audio and HID设备"""
usb_mics = []
usb_speakers = []
for device in self.devices:
name = device['name'].lower()
# 查找USB音频设备
if 'usb audio' in name or 'usb' in name:
if device['input_channels'] > 0:
usb_mics.append(device)
if device['output_channels'] > 0:
usb_speakers.append(device)
return usb_mics, usb_speakers
def auto_select_devices(self):
"""自动选择设备"""
usb_mics, usb_speakers = self.find_usb_audio_devices()
# 选择麦克风
if usb_mics:
for mic in usb_mics:
if 'microphone' in mic['name'].lower():
selected_mic = mic
break
else:
selected_mic = usb_mics[0]
else:
default_mic_idx = self.pa.get_default_input_device_info()['index']
selected_mic = self.devices[default_mic_idx]
print(f"警告: 未找到USB麦克风,使用默认设备: {selected_mic['name']}")
# 选择扬声器
if usb_speakers:
for spk in usb_speakers:
if 'speaker' in spk['name'].lower():
selected_spk = spk
break
else:
selected_spk = usb_speakers[0]
else:
default_spk_idx = self.pa.get_default_output_device_info()['index']
selected_spk = self.devices[default_spk_idx]
print(f"警告: 未找到USB扬声器,使用默认设备: {selected_spk['name']}")
return selected_mic, selected_spk
def terminate(self):
"""清理资源"""
self.pa.terminate()
def at_cmd(ser, cmd, wait=0.2):
"""发送AT命令"""
ser.write((cmd + "\r\n").encode())
time.sleep(wait)
n = ser.in_waiting
return ser.read(n) if n else b""
def resample_44k1_to_8k(pcm):
"""将44.1kHz重采样到8kHz"""
if not pcm:
return b""
x = np.frombuffer(pcm, dtype=np.int16)
if x.size == 0:
return b""
out_len = int(x.size * SIM_RATE / MIC_RATE)
if out_len <= 0:
return b""
idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
return x[idx].astype(np.int16).tobytes()
def main():
"""主程序"""
print("=" * 60)
print("USB音频电话系统")
print("=" * 60)
finder = AudioDeviceFinder()
usb_mics, usb_speakers = finder.find_usb_audio_devices()
if usb_mics:
print(f"\n找到 {len(usb_mics)} 个USB麦克风: ")
for mic in usb_mics:
print(f" [{mic['index']}] {mic['name']}")
else:
print("\n未找到USB麦克风")
if usb_speakers:
print(f"\n找到 {len(usb_speakers)} 个USB扬声器: ")
for spk in usb_speakers:
print(f" [{spk['index']}] {spk['name']}")
else:
print("\n未找到USB扬声器")
# 自动选择设备
mic_device, spk_device = finder.auto_select_devices()
print(f"\n已选择设备:")
print(f" 麦克风: [{mic_device['index']}] {mic_device['name']}")
print(f" 扬声器: [{spk_device['index']}] {spk_device['name']}")
print(f"\n初始化串口...")
try:
at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)
print(f"AT串口: {AT_PORT}, 音频串口: {AUDIO_PORT}")
except Exception as e:
print(f"串口初始化失败: {e}")
finder.terminate()
return
print("发送AT命令探活...")
start_time = time.time()
while time.time() - start_time < 5:
if b"OK" in at_cmd(at_ser, "AT", 0.1):
print("AT OK")
break
time.sleep(0.2)
else:
print("AT命令无响应")
at_ser.close()
au_ser.close()
finder.terminate()
return
print("\n等待来电...")
ring_detected = False
while not ring_detected:
if at_ser.in_waiting:
data = at_ser.read(at_ser.in_waiting)
if b"RING" in data:
print("检测到来电!")
ring_detected = True
break
print(f"AT返回: {data}")
time.sleep(0.1)
print("接听电话...")
at_cmd(at_ser, "ATA", 0.5)
print("电话已接听")
time.sleep(1.0)
print("配置语音模式...")
at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
print("\n初始化音频流...")
try:
spk_stream = finder.pa.open(
format=pyaudio.paInt16,
channels=1,
rate=SIM_RATE,
output=True,
output_device_index=spk_device['index'],
frames_per_buffer=CHUNK_8K
)
print(f"扬声器已打开: {spk_device['name']}")
except Exception as e:
print(f"打开扬声器失败: {e}")
at_ser.close()
au_ser.close()
finder.terminate()
return
mic_frames = int(mic_device['default_sample_rate'] * 0.02)
def mic_callback(in_data, frame_count, time_info, status):
try:
pcm_data = in_data
if mic_device['input_channels'] > 1:
arr = np.frombuffer(pcm_data, dtype=np.int16)
arr = arr.reshape(-1, mic_device['input_channels'])
arr = arr.mean(axis=1).astype(np.int16)
pcm_data = arr.tobytes()
if mic_device['default_sample_rate'] != SIM_RATE:
pcm_data = resample_44k1_to_8k(pcm_data)
if pcm_data:
au_ser.write(pcm_data)
except Exception as e:
print(f"麦克风回调错误: {e}")
return (None, pyaudio.paContinue)
try:
mic_stream = finder.pa.open(
format=pyaudio.paInt16,
channels=mic_device['input_channels'],
rate=mic_device['default_sample_rate'],
input=True,
input_device_index=mic_device['index'],
frames_per_buffer=mic_frames,
stream_callback=mic_callback
)
print(f"麦克风已打开: {mic_device['name']}")
except Exception as e:
print(f"打开麦克风失败: {e}")
spk_stream.close()
at_ser.close()
au_ser.close()
finder.terminate()
return
mic_stream.start_stream()
print("\n电话通话中...")
try:
while True:
if au_ser.in_waiting:
audio_data = au_ser.read(au_ser.in_waiting)
if audio_data:
spk_stream.write(audio_data)
time.sleep(0.01)
except KeyboardInterrupt:
print("\n用户请求挂断...")
finally:
print("清理资源...")
at_cmd(at_ser, "AT+CHUP", 0.2)
print("电话已挂断")
try:
mic_stream.stop_stream()
mic_stream.close()
print("麦克风已关闭")
except:
pass
try:
spk_stream.stop_stream()
spk_stream.close()
print("扬声器已关闭")
except:
pass
at_ser.close()
au_ser.close()
print("串口已关闭")
finder.terminate()
print("音频资源已释放")
print("\n程序结束")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"程序异常: {e}")
import traceback
traceback.print_exc()
input("\n按Enter键退出...")
异常的话,可以查询声卡
# 查看所有音频设备
import pyaudio
pa = pyaudio.PyAudio()
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
print(f"{i}: {info['name']} - Input:{info['maxInputChannels']} Output:{info['maxOutputChannels']}")
pa.terminate()

主机只带USB AUDIO,可以实现通话
PS C:\Users\User\AppData\Local\Programs\Microsoft VS Code> & C:\Users\User\AppData\Local\Programs\Python\Python313\python.exe "d:/2025/1222/import time.py"
0: Microsoft 声音映射器 - Input - Input:2 Output:0
1: 麦克风 (USB Audio and HID) - Input:1 Output:0
2: Microsoft 声音映射器 - Output - Input:0 Output:2
3: 扬声器 (USB Audio and HID) - Input:0 Output:2
4: 耳机 (IN糖3(38:53)) - Input:0 Output:2
5: 主声音捕获驱动程序 - Input:2 Output:0
6: 麦克风 (USB Audio and HID) - Input:1 Output:0
7: 主声音驱动程序 - Input:0 Output:2
8: 扬声器 (USB Audio and HID) - Input:0 Output:2
9: 耳机 (IN糖3(38:53)) - Input:0 Output:2
10: 扬声器 (USB Audio and HID) - Input:0 Output:2
11: 耳机 (IN糖3(38:53)) - Input:0 Output:2
12: 麦克风 (USB Audio and HID) - Input:1 Output:0
13: 麦克风 (USB Audio and HID) - Input:1 Output:0
14: 扬声器 (USB Audio and HID) - Input:0 Output:2
15: 鑰虫満 () - Input:0 Output:2
根据设备列表,我可以看到有几个"USB Audio and HID"设备。从图中和你的设备列表来看,你需要使用的是:
麦克风 (USB Audio and HID) - 索引 1, 6, 12, 13(都有1个输入通道)
扬声器 (USB Audio and HID) - 索引 3, 8, 10, 14(都有2个输出通道)
实现实时通话功能,SIM7600 接听
import time
import serial
import pyaudio
import numpy as np
# ====== 按你当前环境 ======
AT_PORT = "COM15"
AUDIO_PORT = "COM11" # 等下如果没下行,再换
# 根据设备列表调整
MIC_INDEX = 1 # 麦克风 (USB Audio and HID)
SPK_INDEX = 3 # 扬声器 (USB Audio and HID)
# ==========================
SIM_RATE = 8000
CHUNK_8K = 320
MIC_RATE = 44100
def at_cmd(ser, cmd, wait=0.2):
ser.write((cmd + "\r\n").encode())
time.sleep(wait)
n = ser.in_waiting
return ser.read(n) if n else b""
def resample_44k1_to_8k(pcm):
x = np.frombuffer(pcm, dtype=np.int16)
if x.size == 0:
return b""
out_len = int(x.size * SIM_RATE / MIC_RATE)
if out_len <= 0:
return b""
idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
return x[idx].astype(np.int16).tobytes()
def main():
at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)
# 探活
while b"OK" not in at_cmd(at_ser, "AT", 0.05):
pass
print("AT OK")
print("Waiting for incoming call...")
# 等来电
while True:
if at_ser.in_waiting:
r = at_ser.read(at_ser.in_waiting)
if b"RING" in r:
print("Incoming call")
break
time.sleep(0.1)
# 接听
at_cmd(at_ser, "ATA", 0.5)
print("Call answered")
# 等语音真正建立
time.sleep(1.0)
# === 关键语音配置 ===
at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
# =====================
pa = pyaudio.PyAudio()
# 获取设备信息
try:
mic_info = pa.get_device_info_by_index(MIC_INDEX)
mic_ch = int(mic_info["maxInputChannels"])
mic_rate = int(mic_info["defaultSampleRate"])
print(f"Microphone: {mic_info['name']}")
print(f" Channels: {mic_ch}, Sample rate: {mic_rate}Hz")
except Exception as e:
print(f"Error opening microphone device {MIC_INDEX}: {e}")
# 使用默认输入设备
default_mic = pa.get_default_input_device_info()
mic_ch = int(default_mic["maxInputChannels"])
mic_rate = int(default_mic["defaultSampleRate"])
print(f"Using default microphone: {default_mic['name']}")
try:
spk_info = pa.get_device_info_by_index(SPK_INDEX)
spk_ch = int(spk_info["maxOutputChannels"])
spk_rate = SIM_RATE # 使用8kHz作为输出采样率
print(f"Speaker: {spk_info['name']}")
print(f" Channels: {spk_ch}, Sample rate: {spk_rate}Hz")
except Exception as e:
print(f"Error opening speaker device {SPK_INDEX}: {e}")
# 使用默认输出设备
default_spk = pa.get_default_output_device_info()
spk_ch = int(default_spk["maxOutputChannels"])
spk_rate = SIM_RATE
print(f"Using default speaker: {default_spk['name']}")
print(f"Call connected")
print(">>> Press Ctrl+C to hang up <<<")
# 下行播放 - 使用USB扬声器
try:
spk = pa.open(
format=pa.get_format_from_width(2), # 16位
channels=1, # 单声道输出
rate=spk_rate, # 8000Hz
output=True,
output_device_index=SPK_INDEX,
frames_per_buffer=CHUNK_8K
)
print("Speaker opened successfully")
except Exception as e:
print(f"Error opening speaker stream: {e}")
return
# 计算麦克风缓冲区大小
frames_mic = int(mic_rate * 0.02) # 20ms的帧
# 上行麦克风回调函数
def mic_cb(in_data, frame_count, time_info, status):
try:
pcm = in_data
# 如果麦克风是多声道,转换为单声道
if mic_ch > 1:
arr = np.frombuffer(pcm, dtype=np.int16)
arr = arr.reshape(-1, mic_ch)
# 求平均转换为单声道
arr = arr.mean(axis=1).astype(np.int16)
pcm = arr.tobytes()
# 重采样到8kHz(如果需要)
if mic_rate != SIM_RATE:
pcm = resample_44k1_to_8k(pcm)
# 发送到音频串口
if pcm:
au_ser.write(pcm)
except Exception as e:
print(f"Error in mic callback: {e}")
return (None, pyaudio.paContinue)
# 上行麦克风 - 使用USB麦克风
try:
mic = pa.open(
format=pa.get_format_from_width(2),
channels=mic_ch,
rate=mic_rate,
input=True,
input_device_index=MIC_INDEX,
frames_per_buffer=frames_mic,
stream_callback=mic_cb
)
print("Microphone opened successfully")
except Exception as e:
print(f"Error opening microphone stream: {e}")
return
mic.start_stream()
try:
print("Voice call active...")
while True:
# 读取下行音频数据并播放
if au_ser.in_waiting:
data = au_ser.read(au_ser.in_waiting)
if data:
spk.write(data)
time.sleep(0.01)
except KeyboardInterrupt:
print("\nHang up")
finally:
# 挂断电话
at_cmd(at_ser, "AT+CHUP", 0.1)
# 清理音频流
try:
mic.stop_stream()
mic.close()
except:
pass
try:
spk.stop_stream()
spk.close()
except:
pass
try:
pa.terminate()
except:
pass
# 关闭串口
at_ser.close()
au_ser.close()
print("All resources released")
if __name__ == "__main__":
main()
查询声卡和接听一起
import time
import serial
import pyaudio
import numpy as np
import sys
# ====== 串口配置 ======
AT_PORT = "COM15"
AUDIO_PORT = "COM11"
# ====== 音频参数 ======
SIM_RATE = 8000 # 电话音质
CHUNK_8K = 320 # 40ms的帧
MIC_RATE = 44100 # 麦克风默认采样率
class AudioDeviceFinder:
"""音频设备自动查找器"""
def __init__(self):
self.pa = pyaudio.PyAudio()
self.devices = []
self.scan_devices()
def scan_devices(self):
"""扫描所有音频设备"""
print("=" * 60)
print("扫描音频设备...")
print("=" * 60)
for i in range(self.pa.get_device_count()):
info = self.pa.get_device_info_by_index(i)
device = {
'index': i,
'name': info['name'],
'input_channels': int(info['maxInputChannels']),
'output_channels': int(info['maxOutputChannels']),
'default_sample_rate': int(info['defaultSampleRate'])
}
self.devices.append(device)
# 显示设备信息
input_str = f"Input:{device['input_channels']}" if device['input_channels'] > 0 else "Input:0"
output_str = f"Output:{device['output_channels']}" if device['output_channels'] > 0 else "Output:0"
print(f"{i:2d}: {device['name'][:40]:<40} {input_str:<10} {output_str:<10}")
def find_usb_audio_devices(self):
"""查找USB Audio and HID设备"""
usb_mics = []
usb_speakers = []
for device in self.devices:
name = device['name'].lower()
# 查找USB音频设备
if 'usb audio' in name or 'usb' in name:
if device['input_channels'] > 0:
usb_mics.append(device)
if device['output_channels'] > 0:
usb_speakers.append(device)
return usb_mics, usb_speakers
def auto_select_devices(self):
"""自动选择设备"""
usb_mics, usb_speakers = self.find_usb_audio_devices()
# 选择麦克风
if usb_mics:
# 优先选择名称中有"麦克风"的设备
for mic in usb_mics:
if '麦克风' in mic['name'] or 'microphone' in mic['name'].lower():
selected_mic = mic
break
else:
selected_mic = usb_mics[0] # 否则选择第一个
else:
# 没有USB麦克风,使用默认输入设备
default_mic_idx = self.pa.get_default_input_device_info()['index']
selected_mic = self.devices[default_mic_idx]
print(f"警告: 未找到USB麦克风,使用默认设备: {selected_mic['name']}")
# 选择扬声器
if usb_speakers:
# 优先选择名称中有"扬声器"或"speaker"的设备
for spk in usb_speakers:
if '扬声器' in spk['name'] or 'speaker' in spk['name'].lower():
selected_spk = spk
break
else:
selected_spk = usb_speakers[0] # 否则选择第一个
else:
# 没有USB扬声器,使用默认输出设备
default_spk_idx = self.pa.get_default_output_device_info()['index']
selected_spk = self.devices[default_spk_idx]
print(f"警告: 未找到USB扬声器,使用默认设备: {selected_spk['name']}")
return selected_mic, selected_spk
def manual_select_devices(self):
"""手动选择设备"""
print("\n" + "=" * 60)
print("手动选择音频设备")
print("=" * 60)
# 显示所有输入设备
print("\n可用麦克风设备:")
input_devices = [d for d in self.devices if d['input_channels'] > 0]
for device in input_devices:
print(f" [{device['index']}] {device['name']} (通道:{device['input_channels']})")
# 显示所有输出设备
print("\n可用扬声器设备:")
output_devices = [d for d in self.devices if d['output_channels'] > 0]
for device in output_devices:
print(f" [{device['index']}] {device['name']} (通道:{device['output_channels']})")
# 用户选择
try:
mic_idx = int(input(f"\n选择麦克风设备索引 [默认{input_devices[0]['index']}]: ") or input_devices[0]['index'])
spk_idx = int(input(f"选择扬声器设备索引 [默认{output_devices[0]['index']}]: ") or output_devices[0]['index'])
selected_mic = next((d for d in self.devices if d['index'] == mic_idx), input_devices[0])
selected_spk = next((d for d in self.devices if d['index'] == spk_idx), output_devices[0])
return selected_mic, selected_spk
except ValueError:
print("输入错误,使用默认设备")
return input_devices[0], output_devices[0]
def terminate(self):
"""清理资源"""
self.pa.terminate()
def at_cmd(ser, cmd, wait=0.2):
"""发送AT命令"""
ser.write((cmd + "\r\n").encode())
time.sleep(wait)
n = ser.in_waiting
return ser.read(n) if n else b""
def resample_44k1_to_8k(pcm):
"""将44.1kHz重采样到8kHz"""
if not pcm:
return b""
x = np.frombuffer(pcm, dtype=np.int16)
if x.size == 0:
return b""
out_len = int(x.size * SIM_RATE / MIC_RATE)
if out_len <= 0:
return b""
idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
return x[idx].astype(np.int16).tobytes()
def main():
"""主程序"""
print("=" * 60)
print("USB音频电话系统")
print("=" * 60)
# 1. 自动检测音频设备
finder = AudioDeviceFinder()
usb_mics, usb_speakers = finder.find_usb_audio_devices()
# 显示找到的USB设备
if usb_mics:
print(f"\n找到 {len(usb_mics)} 个USB麦克风:")
for mic in usb_mics:
print(f" [{mic['index']}] {mic['name']}")
else:
print("\n未找到USB麦克风")
if usb_speakers:
print(f"\n找到 {len(usb_speakers)} 个USB扬声器:")
for spk in usb_speakers:
print(f" [{spk['index']}] {spk['name']}")
else:
print("\n未找到USB扬声器")
# 选择自动或手动模式
print("\n选择设备配置模式:")
print(" 1. 自动选择USB设备")
print(" 2. 手动选择设备")
try:
mode = int(input("请选择 [1/2, 默认1]: ") or "1")
except ValueError:
mode = 1
if mode == 1:
mic_device, spk_device = finder.auto_select_devices()
else:
mic_device, spk_device = finder.manual_select_devices()
print(f"\n已选择设备:")
print(f" 麦克风: [{mic_device['index']}] {mic_device['name']}")
print(f" 扬声器: [{spk_device['index']}] {spk_device['name']}")
# 确认选择
confirm = input("\n是否继续? [Y/n]: ").lower()
if confirm not in ['', 'y', 'yes']:
print("程序退出")
finder.terminate()
return
# 2. 初始化串口
print(f"\n初始化串口...")
try:
at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)
print(f"AT串口: {AT_PORT}, 音频串口: {AUDIO_PORT}")
except Exception as e:
print(f"串口初始化失败: {e}")
print("请检查串口连接")
finder.terminate()
return
# 3. AT命令探活
print("发送AT命令探活...")
start_time = time.time()
while time.time() - start_time < 5: # 5秒超时
if b"OK" in at_cmd(at_ser, "AT", 0.1):
print("AT OK")
break
time.sleep(0.2)
else:
print("AT命令无响应,请检查模块连接")
at_ser.close()
au_ser.close()
finder.terminate()
return
# 4. 等待来电
print("\n等待来电... (按Ctrl+C退出)")
ring_detected = False
while not ring_detected:
if at_ser.in_waiting:
data = at_ser.read(at_ser.in_waiting)
if b"RING" in data:
print("检测到来电!")
ring_detected = True
break
print(f"AT返回: {data}")
time.sleep(0.1)
# 5. 接听电话
print("接听电话...")
at_cmd(at_ser, "ATA", 0.5)
print("电话已接听")
# 等待语音建立
time.sleep(1.0)
# 6. 配置语音模式
print("配置语音模式...")
at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
# 7. 初始化音频流
print("\n初始化音频流...")
# 打开扬声器(输出)
try:
spk_stream = finder.pa.open(
format=pyaudio.paInt16,
channels=1, # 电话使用单声道
rate=SIM_RATE,
output=True,
output_device_index=spk_device['index'],
frames_per_buffer=CHUNK_8K
)
print(f"扬声器已打开: {spk_device['name']}")
except Exception as e:
print(f"打开扬声器失败: {e}")
at_ser.close()
au_ser.close()
finder.terminate()
return
# 计算麦克风缓冲区大小
mic_frames = int(mic_device['default_sample_rate'] * 0.02) # 20ms
# 定义麦克风回调函数
def mic_callback(in_data, frame_count, time_info, status):
try:
pcm_data = in_data
# 多声道转单声道
if mic_device['input_channels'] > 1:
arr = np.frombuffer(pcm_data, dtype=np.int16)
arr = arr.reshape(-1, mic_device['input_channels'])
arr = arr.mean(axis=1).astype(np.int16)
pcm_data = arr.tobytes()
# 重采样到8kHz
if mic_device['default_sample_rate'] != SIM_RATE:
pcm_data = resample_44k1_to_8k(pcm_data)
# 发送到音频串口
if pcm_data:
au_ser.write(pcm_data)
except Exception as e:
print(f"麦克风回调错误: {e}")
return (None, pyaudio.paContinue)
# 打开麦克风(输入)
try:
mic_stream = finder.pa.open(
format=pyaudio.paInt16,
channels=mic_device['input_channels'],
rate=mic_device['default_sample_rate'],
input=True,
input_device_index=mic_device['index'],
frames_per_buffer=mic_frames,
stream_callback=mic_callback
)
print(f"麦克风已打开: {mic_device['name']} (采样率: {mic_device['default_sample_rate']}Hz)")
except Exception as e:
print(f"打开麦克风失败: {e}")
spk_stream.close()
at_ser.close()
au_ser.close()
finder.terminate()
return
# 启动麦克风流
mic_stream.start_stream()
print("\n" + "=" * 60)
print("电话通话中...")
print("按 Ctrl+C 挂断电话")
print("=" * 60)
# 8. 主循环 - 音频传输
try:
while True:
# 处理下行音频(从模块到扬声器)
if au_ser.in_waiting:
audio_data = au_ser.read(au_ser.in_waiting)
if audio_data:
spk_stream.write(audio_data)
time.sleep(0.01)
except KeyboardInterrupt:
print("\n用户请求挂断...")
# 9. 清理资源
finally:
print("清理资源...")
# 挂断电话
at_cmd(at_ser, "AT+CHUP", 0.2)
print("电话已挂断")
# 停止音频流
try:
mic_stream.stop_stream()
mic_stream.close()
print("麦克风已关闭")
except:
pass
try:
spk_stream.stop_stream()
spk_stream.close()
print("扬声器已关闭")
except:
pass
# 关闭串口
at_ser.close()
au_ser.close()
print("串口已关闭")
# 终止PyAudio
finder.terminate()
print("音频资源已释放")
print("\n程序结束")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"程序异常: {e}")
import traceback
traceback.print_exc()
input("\n按Enter键退出...")
在台式电脑上接蓝牙耳机,实现语音通话(手机没声音)
SIM7600 拨打:
import time
import serial
import pyaudio
import numpy as np
import sys
# ====== 按你当前环境 ======
AT_PORT = "COM8"
AUDIO_PORT = "COM11"
MIC_INDEX = 8
SPK_INDEX = 12
NUMBER = "13143366816"
# ==========================
SIM_RATE = 8000
CHUNK_8K = 320
MIC_RATE = 44100
def at_cmd(ser, cmd, wait=0.2):
ser.reset_input_buffer()
ser.write((cmd + "\r\n").encode())
time.sleep(wait)
n = ser.in_waiting
return ser.read(n) if n else b""
def resample_44k1_to_8k(pcm):
x = np.frombuffer(pcm, dtype=np.int16)
if x.size == 0:
return b""
out_len = int(x.size * SIM_RATE / MIC_RATE)
idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
return x[idx].astype(np.int16).tobytes()
def main():
at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)
while b"OK" not in at_cmd(at_ser, "AT", 0.05):
pass
print("AT OK")
print("Dialing...")
at_cmd(at_ser, f"ATD{NUMBER};", 0.5)
# 等 1 秒让网络把语音通道真正拉起来
time.sleep(1.0)
at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
pa = pyaudio.PyAudio()
mic_info = pa.get_device_info_by_index(MIC_INDEX)
mic_ch = max(1, int(mic_info["maxInputChannels"]))
mic_rate = int(mic_info["defaultSampleRate"])
print(f"Call connected (Bluetooth HFP @ {mic_rate/1000:.1f}kHz)")
print(">>> Press Ctrl+C to hang up <<<")
spk = pa.open(
format=pa.get_format_from_width(2),
channels=1,
rate=SIM_RATE,
output=True,
output_device_index=SPK_INDEX,
frames_per_buffer=CHUNK_8K
)
frames_mic = int(mic_rate * 0.02)
def mic_cb(in_data, frame_count, time_info, status):
try:
pcm = in_data
if mic_ch > 1:
arr = np.frombuffer(pcm, dtype=np.int16)
arr = arr.reshape(-1, mic_ch).mean(axis=1).astype(np.int16)
pcm = arr.tobytes()
if mic_rate != SIM_RATE:
pcm = resample_44k1_to_8k(pcm)
au_ser.write(pcm)
except:
pass
return (None, pyaudio.paContinue)
mic = pa.open(
format=pa.get_format_from_width(2),
channels=mic_ch,
rate=mic_rate,
input=True,
input_device_index=MIC_INDEX,
frames_per_buffer=frames_mic,
stream_callback=mic_cb
)
mic.start_stream()
try:
while True:
if au_ser.in_waiting:
spk.write(au_ser.read(au_ser.in_waiting))
time.sleep(0.01)
except KeyboardInterrupt:
print("\nUser hang up")
finally:
at_cmd(at_ser, "AT+CHUP", 0.1)
try: mic.stop_stream(); mic.close()
except: pass
try: spk.stop_stream(); spk.close()
except: pass
try: pa.terminate()
except: pass
at_ser.close()
au_ser.close()
if __name__ == "__main__":
main()
SIM7600 接听
import time
import serial
import pyaudio
import numpy as np
# ====== 按你当前环境 ======
AT_PORT = "COM8"
AUDIO_PORT = "COM11" # 等下如果没下行,再换
MIC_INDEX = 8
SPK_INDEX = 12
# ==========================
SIM_RATE = 8000
CHUNK_8K = 320
MIC_RATE = 44100
def at_cmd(ser, cmd, wait=0.2):
ser.write((cmd + "\r\n").encode())
time.sleep(wait)
n = ser.in_waiting
return ser.read(n) if n else b""
def resample_44k1_to_8k(pcm):
x = np.frombuffer(pcm, dtype=np.int16)
if x.size == 0:
return b""
out_len = int(x.size * SIM_RATE / MIC_RATE)
if out_len <= 0:
return b""
idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
return x[idx].astype(np.int16).tobytes()
def main():
at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)
# 探活
while b"OK" not in at_cmd(at_ser, "AT", 0.05):
pass
print("AT OK")
print("Waiting for incoming call...")
# 等来电
while True:
if at_ser.in_waiting:
r = at_ser.read(at_ser.in_waiting)
if b"RING" in r:
print("Incoming call")
break
time.sleep(0.1)
# 接听
at_cmd(at_ser, "ATA", 0.5)
print("Call answered")
# 等语音真正建立
time.sleep(1.0)
# === 关键语音配置 ===
at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
# =====================
pa = pyaudio.PyAudio()
mic_info = pa.get_device_info_by_index(MIC_INDEX)
mic_ch = max(1, int(mic_info["maxInputChannels"]))
mic_rate = int(mic_info["defaultSampleRate"])
print(f"Call connected (Bluetooth HFP @ {mic_rate/1000:.1f}kHz)")
print(">>> Press Ctrl+C to hang up <<<")
# 下行播放
spk = pa.open(
format=pa.get_format_from_width(2),
channels=1,
rate=SIM_RATE,
output=True,
output_device_index=SPK_INDEX,
frames_per_buffer=CHUNK_8K
)
frames_mic = int(mic_rate * 0.02)
# 上行麦克风
def mic_cb(in_data, frame_count, time_info, status):
try:
pcm = in_data
if mic_ch > 1:
arr = np.frombuffer(pcm, dtype=np.int16)
arr = arr.reshape(-1, mic_ch).mean(axis=1).astype(np.int16)
pcm = arr.tobytes()
if mic_rate != SIM_RATE:
pcm = resample_44k1_to_8k(pcm)
au_ser.write(pcm)
except:
pass
return (None, pyaudio.paContinue)
mic = pa.open(
format=pa.get_format_from_width(2),
channels=mic_ch,
rate=mic_rate,
input=True,
input_device_index=MIC_INDEX,
frames_per_buffer=frames_mic,
stream_callback=mic_cb
)
mic.start_stream()
try:
while True:
if au_ser.in_waiting:
data = au_ser.read(au_ser.in_waiting)
print("DOWNLINK BYTES =", len(data))
spk.write(data)
time.sleep(0.01)
except KeyboardInterrupt:
print("Hang up")
finally:
at_cmd(at_ser, "AT+CHUP", 0.1)
try: mic.stop_stream(); mic.close()
except: pass
try: spk.stop_stream(); spk.close()
except: pass
try: pa.terminate()
except: pass
at_ser.close()
au_ser.close()
if __name__ == "__main__":
main()
windows 实现导出音频流到电脑网页播放
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<title>SIM7600 PCM 音频播放器(无 AT)</title>
<style>
body {font-family:sans-serif; max-width:800px; margin:20px auto;}
button {padding:8px 16px; margin:4px;}
#log {white-space:pre-wrap; background:#111; color:#0f0; padding:8px; height:150px; overflow:auto;}
</style>
<style>
.download-btn-container {
position: fixed;
bottom: 20px;
left: 50%;
transform: translateX(-50%);
z-index: 1000;
}
.download-btn {
background-color: #2196F3;
color: white;
padding: 10px 20px;
border: none;
border-radius: 5px;
cursor: pointer;
box-shadow: 0 2px 5px rgba(0,0,0,0.2);
position: relative;
padding-right: 35px;
}
.download-btn:hover {
background-color: #1976D2;
}
.close-btn {
background-color: #9e9e9e;
color: white;
width: 20px;
height: 20px;
border: none;
border-radius: 50%;
cursor: pointer;
display: flex;
align-items: center;
justify-content: center;
position: absolute;
right: 8px;
top: 50%;
transform: translateY(-50%);
font-size: 14px;
line-height: 1;
}
.close-btn:hover {
background-color: #757575;
}
.hidden {
display: none !important;
}
</style>
</head>
<body>
<h2>SIM7600 PCM 实时音频播放器</h2>
采样率:
<select id="rate">
<option value="8000">8 kHz</option>
<option value="16000">16 kHz</option>
</select>
<br><br>
<button id="openBtn">打开 PCM 串口并播放</button>
<button id="closeBtn" disabled>停止并断开</button>
<p>状态:<span id="status">未连接</span></p>
<div id="log"></div>
<script>
let port = null;
let reader = null;
let audioCtx = null;
let processor = null;
let running = false;
// 临时缓存,专门用于处理奇数字节情况
let leftover = null;
function log(msg) {
let box = document.getElementById("log");
let t = new Date().toLocaleTimeString();
box.textContent += `[${t}] ${msg}\n`;
box.scrollTop = box.scrollHeight;
}
document.getElementById("openBtn").onclick = async () => {
try {
const sampleRate = parseInt(document.getElementById("rate").value);
port = await navigator.serial.requestPort();
await port.open({ baudRate:115200 });
audioCtx = new AudioContext({sampleRate});
processor = audioCtx.createScriptProcessor(2048, 1, 1);
// 环形缓冲区(float32)
const bufferSize = sampleRate * 2;
const audioBuffer = new Float32Array(bufferSize);
let writePtr = 0, readPtr = 0;
processor.onaudioprocess = (e)=>{
let output = e.outputBuffer.getChannelData(0);
for (let i=0; i<output.length; i++) {
if (readPtr !== writePtr) {
output[i] = audioBuffer[readPtr];
readPtr = (readPtr + 1) % bufferSize;
} else {
output[i] = 0;
}
}
};
processor.connect(audioCtx.destination);
running = true;
document.getElementById("openBtn").disabled = true;
document.getElementById("closeBtn").disabled = false;
document.getElementById("status").textContent = "已连接(等待音频)";
log("串口已打开。");
reader = port.readable.getReader();
leftover = new Uint8Array(0);
while (running) {
const { value, done } = await reader.read();
if (done) break;
if (!value) continue;
// 拼接 leftover(奇数字节时用)
let bytes = new Uint8Array(leftover.length + value.length);
bytes.set(leftover, 0);
bytes.set(value, leftover.length);
// 保证偶数字节(16bit 对齐)
let alignedLength = bytes.length - (bytes.length % 2);
let pcm = bytes.slice(0, alignedLength);
leftover = bytes.slice(alignedLength);
const dv = new DataView(pcm.buffer, pcm.byteOffset, pcm.byteLength);
const count = pcm.length / 2;
for (let i=0;i<count;i++) {
let s = dv.getInt16(i*2, true);
let f = s / 32768;
audioBuffer[writePtr] = f;
writePtr = (writePtr + 1) % bufferSize;
}
}
} catch(e) {
console.error(e);
log("错误:" + e);
document.getElementById("status").textContent = "错误";
}
};
document.getElementById("closeBtn").onclick = async () => {
running = false;
try { reader && await reader.cancel(); } catch(e){}
try { port && await port.close(); } catch(e){}
try { processor && processor.disconnect(); } catch(e){}
try { audioCtx && await audioCtx.close(); } catch(e){}
document.getElementById("openBtn").disabled = false;
document.getElementById("closeBtn").disabled = true;
document.getElementById("status").textContent = "未连接";
log("已关闭。");
};
</script>
<div class="download-btn-container" id="downloadContainer">
<button class="download-btn" onclick="downloadHTML()">
下载HTML文件
<button class="close-btn" onclick="closeDownloadBtn(event)" title="关闭">×</button>
</button>
</div>
<script>
function downloadHTML() {
window.location.href = '/html2images/download/' + window.location.pathname.split('/').pop();
}
function closeDownloadBtn(event) {
event.stopPropagation(); // 阻止事件冒泡,防止触发下载
document.getElementById('downloadContainer').classList.add('hidden');
}
// 检查是否为本地文件
if (window.location.protocol === 'file:' || window.location.hostname !== 'api.xupeidong.cn') {
document.getElementById('downloadContainer').classList.add('hidden');
}
</script>
</body>
</html>
更多推荐






所有评论(0)