树莓派5测试

树莓派接USB声卡.  手机给 SIM7600 拨打电话

import time
import serial
import pyaudio
import numpy as np
import sys

# ====== 串口配置 ======
AT_PORT = "/dev/ttyUSB2"  # 根据树莓派5的串口调整
AUDIO_PORT = "/dev/ttyUSB4"  # 根据树莓派5的串口调整

# ====== 音频参数 ======
SIM_RATE = 8000      # 电话音质
CHUNK_8K = 320       # 40ms的帧
MIC_RATE = 44100     # 麦克风默认采样率

class AudioDeviceFinder:
    """音频设备自动查找器"""
    
    def __init__(self):
        self.pa = pyaudio.PyAudio()
        self.devices = []
        self.scan_devices()
    
    def scan_devices(self):
        """扫描所有音频设备"""
        print("=" * 60)
        print("扫描音频设备...")
        print("=" * 60)
        
        for i in range(self.pa.get_device_count()):
            info = self.pa.get_device_info_by_index(i)
            device = {
                'index': i,
                'name': info['name'],
                'input_channels': int(info['maxInputChannels']),
                'output_channels': int(info['maxOutputChannels']),
                'default_sample_rate': int(info['defaultSampleRate'])
            }
            self.devices.append(device)
            
            # 显示设备信息
            input_str = f"Input:{device['input_channels']}" if device['input_channels'] > 0 else "Input:0"
            output_str = f"Output:{device['output_channels']}" if device['output_channels'] > 0 else "Output:0"
            print(f"{i:2d}: {device['name'][:40]:<40} {input_str:<10} {output_str:<10}")
    
    def find_usb_audio_devices(self):
        """查找USB Audio and HID设备"""
        usb_mics = []
        usb_speakers = []
        
        for device in self.devices:
            name = device['name'].lower()
            
            # 查找USB音频设备
            if 'usb audio' in name or 'usb' in name:
                if device['input_channels'] > 0:
                    usb_mics.append(device)
                if device['output_channels'] > 0:
                    usb_speakers.append(device)
        
        return usb_mics, usb_speakers
    
    def auto_select_devices(self):
        """自动选择设备"""
        usb_mics, usb_speakers = self.find_usb_audio_devices()
        
        # 选择麦克风
        if usb_mics:
            for mic in usb_mics:
                if 'microphone' in mic['name'].lower():
                    selected_mic = mic
                    break
            else:
                selected_mic = usb_mics[0]
        else:
            default_mic_idx = self.pa.get_default_input_device_info()['index']
            selected_mic = self.devices[default_mic_idx]
            print(f"警告: 未找到USB麦克风,使用默认设备: {selected_mic['name']}")
        
        # 选择扬声器
        if usb_speakers:
            for spk in usb_speakers:
                if 'speaker' in spk['name'].lower():
                    selected_spk = spk
                    break
            else:
                selected_spk = usb_speakers[0]
        else:
            default_spk_idx = self.pa.get_default_output_device_info()['index']
            selected_spk = self.devices[default_spk_idx]
            print(f"警告: 未找到USB扬声器,使用默认设备: {selected_spk['name']}")
        
        return selected_mic, selected_spk
    
    def terminate(self):
        """清理资源"""
        self.pa.terminate()

def at_cmd(ser, cmd, wait=0.2):
    """发送AT命令"""
    ser.write((cmd + "\r\n").encode())
    time.sleep(wait)
    n = ser.in_waiting
    return ser.read(n) if n else b""


def resample_44k1_to_8k(pcm):
    """将44.1kHz重采样到8kHz"""
    if not pcm:
        return b""
    
    x = np.frombuffer(pcm, dtype=np.int16)
    if x.size == 0:
        return b""
    
    out_len = int(x.size * SIM_RATE / MIC_RATE)
    if out_len <= 0:
        return b""
    
    idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
    return x[idx].astype(np.int16).tobytes()

def main():
    """主程序"""
    print("=" * 60)
    print("USB音频电话系统")
    print("=" * 60)
    
    finder = AudioDeviceFinder()
    usb_mics, usb_speakers = finder.find_usb_audio_devices()
    
    if usb_mics:
        print(f"\n找到 {len(usb_mics)} 个USB麦克风: ")
        for mic in usb_mics:
            print(f"  [{mic['index']}] {mic['name']}")
    else:
        print("\n未找到USB麦克风")
    
    if usb_speakers:
        print(f"\n找到 {len(usb_speakers)} 个USB扬声器: ")
        for spk in usb_speakers:
            print(f"  [{spk['index']}] {spk['name']}")
    else:
        print("\n未找到USB扬声器")
    
    # 自动选择设备
    mic_device, spk_device = finder.auto_select_devices()
    
    print(f"\n已选择设备:")
    print(f"  麦克风: [{mic_device['index']}] {mic_device['name']}")
    print(f"  扬声器: [{spk_device['index']}] {spk_device['name']}")
    
    print(f"\n初始化串口...")
    try:
        at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
        au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)
        print(f"AT串口: {AT_PORT}, 音频串口: {AUDIO_PORT}")
    except Exception as e:
        print(f"串口初始化失败: {e}")
        finder.terminate()
        return
    
    print("发送AT命令探活...")
    start_time = time.time()
    while time.time() - start_time < 5:
        if b"OK" in at_cmd(at_ser, "AT", 0.1):
            print("AT OK")
            break
        time.sleep(0.2)
    else:
        print("AT命令无响应")
        at_ser.close()
        au_ser.close()
        finder.terminate()
        return
    
    print("\n等待来电...")
    ring_detected = False
    while not ring_detected:
        if at_ser.in_waiting:
            data = at_ser.read(at_ser.in_waiting)
            if b"RING" in data:
                print("检测到来电!")
                ring_detected = True
                break
            print(f"AT返回: {data}")
        time.sleep(0.1)
    
    print("接听电话...")
    at_cmd(at_ser, "ATA", 0.5)
    print("电话已接听")
    time.sleep(1.0)
    print("配置语音模式...")
    at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
    print("\n初始化音频流...")
    
    try:
        spk_stream = finder.pa.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=SIM_RATE,
            output=True,
            output_device_index=spk_device['index'],
            frames_per_buffer=CHUNK_8K
        )
        print(f"扬声器已打开: {spk_device['name']}")
    except Exception as e:
        print(f"打开扬声器失败: {e}")
        at_ser.close()
        au_ser.close()
        finder.terminate()
        return
    
    mic_frames = int(mic_device['default_sample_rate'] * 0.02)
    
    def mic_callback(in_data, frame_count, time_info, status):
        try:
            pcm_data = in_data
            if mic_device['input_channels'] > 1:
                arr = np.frombuffer(pcm_data, dtype=np.int16)
                arr = arr.reshape(-1, mic_device['input_channels'])
                arr = arr.mean(axis=1).astype(np.int16)
                pcm_data = arr.tobytes()
            
            if mic_device['default_sample_rate'] != SIM_RATE:
                pcm_data = resample_44k1_to_8k(pcm_data)
            
            if pcm_data:
                au_ser.write(pcm_data)
        except Exception as e:
            print(f"麦克风回调错误: {e}")
        
        return (None, pyaudio.paContinue)
    
    try:
        mic_stream = finder.pa.open(
            format=pyaudio.paInt16,
            channels=mic_device['input_channels'],
            rate=mic_device['default_sample_rate'],
            input=True,
            input_device_index=mic_device['index'],
            frames_per_buffer=mic_frames,
            stream_callback=mic_callback
        )
        print(f"麦克风已打开: {mic_device['name']}")
    except Exception as e:
        print(f"打开麦克风失败: {e}")
        spk_stream.close()
        at_ser.close()
        au_ser.close()
        finder.terminate()
        return
    
    mic_stream.start_stream()
    print("\n电话通话中...")
    
    try:
        while True:
            if au_ser.in_waiting:
                audio_data = au_ser.read(au_ser.in_waiting)
                if audio_data:
                    spk_stream.write(audio_data)
            time.sleep(0.01)
    except KeyboardInterrupt:
        print("\n用户请求挂断...")
    
    finally:
        print("清理资源...")
        at_cmd(at_ser, "AT+CHUP", 0.2)
        print("电话已挂断")
        try:
            mic_stream.stop_stream()
            mic_stream.close()
            print("麦克风已关闭")
        except:
            pass
        
        try:
            spk_stream.stop_stream()
            spk_stream.close()
            print("扬声器已关闭")
        except:
            pass
        
        at_ser.close()
        au_ser.close()
        print("串口已关闭")
        finder.terminate()
        print("音频资源已释放")
        
        print("\n程序结束")
 
if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"程序异常: {e}")
        import traceback
        traceback.print_exc()
        input("\n按Enter键退出...")

异常的话,可以查询声卡

# 查看所有音频设备
import pyaudio
pa = pyaudio.PyAudio()
for i in range(pa.get_device_count()):
    info = pa.get_device_info_by_index(i)
    print(f"{i}: {info['name']} - Input:{info['maxInputChannels']} Output:{info['maxOutputChannels']}")
pa.terminate()

主机只带USB AUDIO,可以实现通话

PS C:\Users\User\AppData\Local\Programs\Microsoft VS Code> & C:\Users\User\AppData\Local\Programs\Python\Python313\python.exe "d:/2025/1222/import time.py"
0: Microsoft 声音映射器 - Input - Input:2 Output:0
1: 麦克风 (USB Audio and HID) - Input:1 Output:0
2: Microsoft 声音映射器 - Output - Input:0 Output:2
3: 扬声器 (USB Audio and HID) - Input:0 Output:2
4: 耳机 (IN糖3(38:53)) - Input:0 Output:2
5: 主声音捕获驱动程序 - Input:2 Output:0
6: 麦克风 (USB Audio and HID) - Input:1 Output:0
7: 主声音驱动程序 - Input:0 Output:2
8: 扬声器 (USB Audio and HID) - Input:0 Output:2
9: 耳机 (IN糖3(38:53)) - Input:0 Output:2
10: 扬声器 (USB Audio and HID) - Input:0 Output:2
11: 耳机 (IN糖3(38:53)) - Input:0 Output:2
12: 麦克风 (USB Audio and HID) - Input:1 Output:0
13: 麦克风 (USB Audio and HID) - Input:1 Output:0
14: 扬声器 (USB Audio and HID) - Input:0 Output:2
15: 鑰虫満 () - Input:0 Output:2


根据设备列表,我可以看到有几个"USB Audio and HID"设备。从图中和你的设备列表来看,你需要使用的是:

麦克风 (USB Audio and HID) - 索引 1, 6, 12, 13(都有1个输入通道)

扬声器 (USB Audio and HID) - 索引 3, 8, 10, 14(都有2个输出通道)

实现实时通话功能,SIM7600 接听

import time
import serial
import pyaudio
import numpy as np

# ====== 按你当前环境 ======
AT_PORT    = "COM15"
AUDIO_PORT = "COM11"   # 等下如果没下行,再换

# 根据设备列表调整
MIC_INDEX  = 1   # 麦克风 (USB Audio and HID)
SPK_INDEX  = 3   # 扬声器 (USB Audio and HID)
# ==========================

SIM_RATE = 8000
CHUNK_8K = 320
MIC_RATE = 44100

def at_cmd(ser, cmd, wait=0.2):
    ser.write((cmd + "\r\n").encode())
    time.sleep(wait)
    n = ser.in_waiting
    return ser.read(n) if n else b""

def resample_44k1_to_8k(pcm):
    x = np.frombuffer(pcm, dtype=np.int16)
    if x.size == 0:
        return b""
    out_len = int(x.size * SIM_RATE / MIC_RATE)
    if out_len <= 0:
        return b""
    idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
    return x[idx].astype(np.int16).tobytes()

def main():
    at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
    au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)

    # 探活
    while b"OK" not in at_cmd(at_ser, "AT", 0.05):
        pass
    print("AT OK")

    print("Waiting for incoming call...")

    # 等来电
    while True:
        if at_ser.in_waiting:
            r = at_ser.read(at_ser.in_waiting)
            if b"RING" in r:
                print("Incoming call")
                break
        time.sleep(0.1)

    # 接听
    at_cmd(at_ser, "ATA", 0.5)
    print("Call answered")

    # 等语音真正建立
    time.sleep(1.0)

    # === 关键语音配置 ===
    at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
    # =====================

    pa = pyaudio.PyAudio()

    # 获取设备信息
    try:
        mic_info = pa.get_device_info_by_index(MIC_INDEX)
        mic_ch = int(mic_info["maxInputChannels"])
        mic_rate = int(mic_info["defaultSampleRate"])
        print(f"Microphone: {mic_info['name']}")
        print(f"  Channels: {mic_ch}, Sample rate: {mic_rate}Hz")
    except Exception as e:
        print(f"Error opening microphone device {MIC_INDEX}: {e}")
        # 使用默认输入设备
        default_mic = pa.get_default_input_device_info()
        mic_ch = int(default_mic["maxInputChannels"])
        mic_rate = int(default_mic["defaultSampleRate"])
        print(f"Using default microphone: {default_mic['name']}")

    try:
        spk_info = pa.get_device_info_by_index(SPK_INDEX)
        spk_ch = int(spk_info["maxOutputChannels"])
        spk_rate = SIM_RATE  # 使用8kHz作为输出采样率
        print(f"Speaker: {spk_info['name']}")
        print(f"  Channels: {spk_ch}, Sample rate: {spk_rate}Hz")
    except Exception as e:
        print(f"Error opening speaker device {SPK_INDEX}: {e}")
        # 使用默认输出设备
        default_spk = pa.get_default_output_device_info()
        spk_ch = int(default_spk["maxOutputChannels"])
        spk_rate = SIM_RATE
        print(f"Using default speaker: {default_spk['name']}")

    print(f"Call connected")
    print(">>> Press Ctrl+C to hang up <<<")

    # 下行播放 - 使用USB扬声器
    try:
        spk = pa.open(
            format=pa.get_format_from_width(2),  # 16位
            channels=1,                          # 单声道输出
            rate=spk_rate,                       # 8000Hz
            output=True,
            output_device_index=SPK_INDEX,
            frames_per_buffer=CHUNK_8K
        )
        print("Speaker opened successfully")
    except Exception as e:
        print(f"Error opening speaker stream: {e}")
        return

    # 计算麦克风缓冲区大小
    frames_mic = int(mic_rate * 0.02)  # 20ms的帧

    # 上行麦克风回调函数
    def mic_cb(in_data, frame_count, time_info, status):
        try:
            pcm = in_data
            # 如果麦克风是多声道,转换为单声道
            if mic_ch > 1:
                arr = np.frombuffer(pcm, dtype=np.int16)
                arr = arr.reshape(-1, mic_ch)
                # 求平均转换为单声道
                arr = arr.mean(axis=1).astype(np.int16)
                pcm = arr.tobytes()
            
            # 重采样到8kHz(如果需要)
            if mic_rate != SIM_RATE:
                pcm = resample_44k1_to_8k(pcm)
            
            # 发送到音频串口
            if pcm:
                au_ser.write(pcm)
        except Exception as e:
            print(f"Error in mic callback: {e}")
        return (None, pyaudio.paContinue)

    # 上行麦克风 - 使用USB麦克风
    try:
        mic = pa.open(
            format=pa.get_format_from_width(2),
            channels=mic_ch,
            rate=mic_rate,
            input=True,
            input_device_index=MIC_INDEX,
            frames_per_buffer=frames_mic,
            stream_callback=mic_cb
        )
        print("Microphone opened successfully")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        return

    mic.start_stream()

    try:
        print("Voice call active...")
        while True:
            # 读取下行音频数据并播放
            if au_ser.in_waiting:
                data = au_ser.read(au_ser.in_waiting)
                if data:
                    spk.write(data)
            time.sleep(0.01)

    except KeyboardInterrupt:
        print("\nHang up")

    finally:
        # 挂断电话
        at_cmd(at_ser, "AT+CHUP", 0.1)
        
        # 清理音频流
        try: 
            mic.stop_stream()
            mic.close()
        except: 
            pass
        try: 
            spk.stop_stream()
            spk.close()
        except: 
            pass
        try: 
            pa.terminate()
        except: 
            pass
        
        # 关闭串口
        at_ser.close()
        au_ser.close()
        print("All resources released")

if __name__ == "__main__":
    main()

查询声卡和接听一起

import time
import serial
import pyaudio
import numpy as np
import sys

# ====== 串口配置 ======
AT_PORT    = "COM15"
AUDIO_PORT = "COM11"

# ====== 音频参数 ======
SIM_RATE = 8000      # 电话音质
CHUNK_8K = 320       # 40ms的帧
MIC_RATE = 44100     # 麦克风默认采样率

class AudioDeviceFinder:
    """音频设备自动查找器"""
    
    def __init__(self):
        self.pa = pyaudio.PyAudio()
        self.devices = []
        self.scan_devices()
    
    def scan_devices(self):
        """扫描所有音频设备"""
        print("=" * 60)
        print("扫描音频设备...")
        print("=" * 60)
        
        for i in range(self.pa.get_device_count()):
            info = self.pa.get_device_info_by_index(i)
            device = {
                'index': i,
                'name': info['name'],
                'input_channels': int(info['maxInputChannels']),
                'output_channels': int(info['maxOutputChannels']),
                'default_sample_rate': int(info['defaultSampleRate'])
            }
            self.devices.append(device)
            
            # 显示设备信息
            input_str = f"Input:{device['input_channels']}" if device['input_channels'] > 0 else "Input:0"
            output_str = f"Output:{device['output_channels']}" if device['output_channels'] > 0 else "Output:0"
            print(f"{i:2d}: {device['name'][:40]:<40} {input_str:<10} {output_str:<10}")
    
    def find_usb_audio_devices(self):
        """查找USB Audio and HID设备"""
        usb_mics = []
        usb_speakers = []
        
        for device in self.devices:
            name = device['name'].lower()
            
            # 查找USB音频设备
            if 'usb audio' in name or 'usb' in name:
                if device['input_channels'] > 0:
                    usb_mics.append(device)
                if device['output_channels'] > 0:
                    usb_speakers.append(device)
        
        return usb_mics, usb_speakers
    
    def auto_select_devices(self):
        """自动选择设备"""
        usb_mics, usb_speakers = self.find_usb_audio_devices()
        
        # 选择麦克风
        if usb_mics:
            # 优先选择名称中有"麦克风"的设备
            for mic in usb_mics:
                if '麦克风' in mic['name'] or 'microphone' in mic['name'].lower():
                    selected_mic = mic
                    break
            else:
                selected_mic = usb_mics[0]  # 否则选择第一个
        else:
            # 没有USB麦克风,使用默认输入设备
            default_mic_idx = self.pa.get_default_input_device_info()['index']
            selected_mic = self.devices[default_mic_idx]
            print(f"警告: 未找到USB麦克风,使用默认设备: {selected_mic['name']}")
        
        # 选择扬声器
        if usb_speakers:
            # 优先选择名称中有"扬声器"或"speaker"的设备
            for spk in usb_speakers:
                if '扬声器' in spk['name'] or 'speaker' in spk['name'].lower():
                    selected_spk = spk
                    break
            else:
                selected_spk = usb_speakers[0]  # 否则选择第一个
        else:
            # 没有USB扬声器,使用默认输出设备
            default_spk_idx = self.pa.get_default_output_device_info()['index']
            selected_spk = self.devices[default_spk_idx]
            print(f"警告: 未找到USB扬声器,使用默认设备: {selected_spk['name']}")
        
        return selected_mic, selected_spk
    
    def manual_select_devices(self):
        """手动选择设备"""
        print("\n" + "=" * 60)
        print("手动选择音频设备")
        print("=" * 60)
        
        # 显示所有输入设备
        print("\n可用麦克风设备:")
        input_devices = [d for d in self.devices if d['input_channels'] > 0]
        for device in input_devices:
            print(f"  [{device['index']}] {device['name']} (通道:{device['input_channels']})")
        
        # 显示所有输出设备
        print("\n可用扬声器设备:")
        output_devices = [d for d in self.devices if d['output_channels'] > 0]
        for device in output_devices:
            print(f"  [{device['index']}] {device['name']} (通道:{device['output_channels']})")
        
        # 用户选择
        try:
            mic_idx = int(input(f"\n选择麦克风设备索引 [默认{input_devices[0]['index']}]: ") or input_devices[0]['index'])
            spk_idx = int(input(f"选择扬声器设备索引 [默认{output_devices[0]['index']}]: ") or output_devices[0]['index'])
            
            selected_mic = next((d for d in self.devices if d['index'] == mic_idx), input_devices[0])
            selected_spk = next((d for d in self.devices if d['index'] == spk_idx), output_devices[0])
            
            return selected_mic, selected_spk
        except ValueError:
            print("输入错误,使用默认设备")
            return input_devices[0], output_devices[0]
    
    def terminate(self):
        """清理资源"""
        self.pa.terminate()

def at_cmd(ser, cmd, wait=0.2):
    """发送AT命令"""
    ser.write((cmd + "\r\n").encode())
    time.sleep(wait)
    n = ser.in_waiting
    return ser.read(n) if n else b""

def resample_44k1_to_8k(pcm):
    """将44.1kHz重采样到8kHz"""
    if not pcm:
        return b""
    
    x = np.frombuffer(pcm, dtype=np.int16)
    if x.size == 0:
        return b""
    
    out_len = int(x.size * SIM_RATE / MIC_RATE)
    if out_len <= 0:
        return b""
    
    idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
    return x[idx].astype(np.int16).tobytes()

def main():
    """主程序"""
    print("=" * 60)
    print("USB音频电话系统")
    print("=" * 60)
    
    # 1. 自动检测音频设备
    finder = AudioDeviceFinder()
    usb_mics, usb_speakers = finder.find_usb_audio_devices()
    
    # 显示找到的USB设备
    if usb_mics:
        print(f"\n找到 {len(usb_mics)} 个USB麦克风:")
        for mic in usb_mics:
            print(f"  [{mic['index']}] {mic['name']}")
    else:
        print("\n未找到USB麦克风")
    
    if usb_speakers:
        print(f"\n找到 {len(usb_speakers)} 个USB扬声器:")
        for spk in usb_speakers:
            print(f"  [{spk['index']}] {spk['name']}")
    else:
        print("\n未找到USB扬声器")
    
    # 选择自动或手动模式
    print("\n选择设备配置模式:")
    print("  1. 自动选择USB设备")
    print("  2. 手动选择设备")
    
    try:
        mode = int(input("请选择 [1/2, 默认1]: ") or "1")
    except ValueError:
        mode = 1
    
    if mode == 1:
        mic_device, spk_device = finder.auto_select_devices()
    else:
        mic_device, spk_device = finder.manual_select_devices()
    
    print(f"\n已选择设备:")
    print(f"  麦克风: [{mic_device['index']}] {mic_device['name']}")
    print(f"  扬声器: [{spk_device['index']}] {spk_device['name']}")
    
    # 确认选择
    confirm = input("\n是否继续? [Y/n]: ").lower()
    if confirm not in ['', 'y', 'yes']:
        print("程序退出")
        finder.terminate()
        return
    
    # 2. 初始化串口
    print(f"\n初始化串口...")
    try:
        at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
        au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)
        print(f"AT串口: {AT_PORT}, 音频串口: {AUDIO_PORT}")
    except Exception as e:
        print(f"串口初始化失败: {e}")
        print("请检查串口连接")
        finder.terminate()
        return
    
    # 3. AT命令探活
    print("发送AT命令探活...")
    start_time = time.time()
    while time.time() - start_time < 5:  # 5秒超时
        if b"OK" in at_cmd(at_ser, "AT", 0.1):
            print("AT OK")
            break
        time.sleep(0.2)
    else:
        print("AT命令无响应,请检查模块连接")
        at_ser.close()
        au_ser.close()
        finder.terminate()
        return
    
    # 4. 等待来电
    print("\n等待来电... (按Ctrl+C退出)")
    
    ring_detected = False
    while not ring_detected:
        if at_ser.in_waiting:
            data = at_ser.read(at_ser.in_waiting)
            if b"RING" in data:
                print("检测到来电!")
                ring_detected = True
                break
            print(f"AT返回: {data}")
        time.sleep(0.1)
    
    # 5. 接听电话
    print("接听电话...")
    at_cmd(at_ser, "ATA", 0.5)
    print("电话已接听")
    
    # 等待语音建立
    time.sleep(1.0)
    
    # 6. 配置语音模式
    print("配置语音模式...")
    at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
    
    # 7. 初始化音频流
    print("\n初始化音频流...")
    
    # 打开扬声器(输出)
    try:
        spk_stream = finder.pa.open(
            format=pyaudio.paInt16,
            channels=1,  # 电话使用单声道
            rate=SIM_RATE,
            output=True,
            output_device_index=spk_device['index'],
            frames_per_buffer=CHUNK_8K
        )
        print(f"扬声器已打开: {spk_device['name']}")
    except Exception as e:
        print(f"打开扬声器失败: {e}")
        at_ser.close()
        au_ser.close()
        finder.terminate()
        return
    
    # 计算麦克风缓冲区大小
    mic_frames = int(mic_device['default_sample_rate'] * 0.02)  # 20ms
    
    # 定义麦克风回调函数
    def mic_callback(in_data, frame_count, time_info, status):
        try:
            pcm_data = in_data
            
            # 多声道转单声道
            if mic_device['input_channels'] > 1:
                arr = np.frombuffer(pcm_data, dtype=np.int16)
                arr = arr.reshape(-1, mic_device['input_channels'])
                arr = arr.mean(axis=1).astype(np.int16)
                pcm_data = arr.tobytes()
            
            # 重采样到8kHz
            if mic_device['default_sample_rate'] != SIM_RATE:
                pcm_data = resample_44k1_to_8k(pcm_data)
            
            # 发送到音频串口
            if pcm_data:
                au_ser.write(pcm_data)
                
        except Exception as e:
            print(f"麦克风回调错误: {e}")
        
        return (None, pyaudio.paContinue)
    
    # 打开麦克风(输入)
    try:
        mic_stream = finder.pa.open(
            format=pyaudio.paInt16,
            channels=mic_device['input_channels'],
            rate=mic_device['default_sample_rate'],
            input=True,
            input_device_index=mic_device['index'],
            frames_per_buffer=mic_frames,
            stream_callback=mic_callback
        )
        print(f"麦克风已打开: {mic_device['name']} (采样率: {mic_device['default_sample_rate']}Hz)")
    except Exception as e:
        print(f"打开麦克风失败: {e}")
        spk_stream.close()
        at_ser.close()
        au_ser.close()
        finder.terminate()
        return
    
    # 启动麦克风流
    mic_stream.start_stream()
    
    print("\n" + "=" * 60)
    print("电话通话中...")
    print("按 Ctrl+C 挂断电话")
    print("=" * 60)
    
    # 8. 主循环 - 音频传输
    try:
        while True:
            # 处理下行音频(从模块到扬声器)
            if au_ser.in_waiting:
                audio_data = au_ser.read(au_ser.in_waiting)
                if audio_data:
                    spk_stream.write(audio_data)
            
            time.sleep(0.01)
            
    except KeyboardInterrupt:
        print("\n用户请求挂断...")
    
    # 9. 清理资源
    finally:
        print("清理资源...")
        
        # 挂断电话
        at_cmd(at_ser, "AT+CHUP", 0.2)
        print("电话已挂断")
        
        # 停止音频流
        try:
            mic_stream.stop_stream()
            mic_stream.close()
            print("麦克风已关闭")
        except:
            pass
        
        try:
            spk_stream.stop_stream()
            spk_stream.close()
            print("扬声器已关闭")
        except:
            pass
        
        # 关闭串口
        at_ser.close()
        au_ser.close()
        print("串口已关闭")
        
        # 终止PyAudio
        finder.terminate()
        print("音频资源已释放")
        
        print("\n程序结束")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"程序异常: {e}")
        import traceback
        traceback.print_exc()
        input("\n按Enter键退出...")

在台式电脑上接蓝牙耳机,实现语音通话(手机没声音)

SIM7600 拨打:

import time
import serial
import pyaudio
import numpy as np
import sys

# ====== 按你当前环境 ======
AT_PORT    = "COM8"
AUDIO_PORT = "COM11"
MIC_INDEX  = 8
SPK_INDEX  = 12
NUMBER     = "13143366816"
# ==========================

SIM_RATE = 8000
CHUNK_8K = 320
MIC_RATE = 44100

def at_cmd(ser, cmd, wait=0.2):
    ser.reset_input_buffer()
    ser.write((cmd + "\r\n").encode())
    time.sleep(wait)
    n = ser.in_waiting
    return ser.read(n) if n else b""

def resample_44k1_to_8k(pcm):
    x = np.frombuffer(pcm, dtype=np.int16)
    if x.size == 0:
        return b""
    out_len = int(x.size * SIM_RATE / MIC_RATE)
    idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
    return x[idx].astype(np.int16).tobytes()

def main():
    at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
    au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)

    while b"OK" not in at_cmd(at_ser, "AT", 0.05):
        pass
    print("AT OK")

    print("Dialing...")
    at_cmd(at_ser, f"ATD{NUMBER};", 0.5)

    # 等 1 秒让网络把语音通道真正拉起来
    time.sleep(1.0)

    at_cmd(at_ser, "AT+CPCMREG=1", 0.1)

    pa = pyaudio.PyAudio()
    mic_info = pa.get_device_info_by_index(MIC_INDEX)
    mic_ch   = max(1, int(mic_info["maxInputChannels"]))
    mic_rate = int(mic_info["defaultSampleRate"])

    print(f"Call connected (Bluetooth HFP @ {mic_rate/1000:.1f}kHz)")
    print(">>> Press Ctrl+C to hang up <<<")

    spk = pa.open(
        format=pa.get_format_from_width(2),
        channels=1,
        rate=SIM_RATE,
        output=True,
        output_device_index=SPK_INDEX,
        frames_per_buffer=CHUNK_8K
    )

    frames_mic = int(mic_rate * 0.02)

    def mic_cb(in_data, frame_count, time_info, status):
        try:
            pcm = in_data
            if mic_ch > 1:
                arr = np.frombuffer(pcm, dtype=np.int16)
                arr = arr.reshape(-1, mic_ch).mean(axis=1).astype(np.int16)
                pcm = arr.tobytes()
            if mic_rate != SIM_RATE:
                pcm = resample_44k1_to_8k(pcm)
            au_ser.write(pcm)
        except:
            pass
        return (None, pyaudio.paContinue)

    mic = pa.open(
        format=pa.get_format_from_width(2),
        channels=mic_ch,
        rate=mic_rate,
        input=True,
        input_device_index=MIC_INDEX,
        frames_per_buffer=frames_mic,
        stream_callback=mic_cb
    )

    mic.start_stream()

    try:
        while True:
            if au_ser.in_waiting:
                spk.write(au_ser.read(au_ser.in_waiting))
            time.sleep(0.01)

    except KeyboardInterrupt:
        print("\nUser hang up")

    finally:
        at_cmd(at_ser, "AT+CHUP", 0.1)
        try: mic.stop_stream(); mic.close()
        except: pass
        try: spk.stop_stream(); spk.close()
        except: pass
        try: pa.terminate()
        except: pass
        at_ser.close()
        au_ser.close()

if __name__ == "__main__":
    main()

SIM7600 接听

import time
import serial
import pyaudio
import numpy as np

# ====== 按你当前环境 ======
AT_PORT    = "COM8"
AUDIO_PORT = "COM11"   # 等下如果没下行,再换
MIC_INDEX  = 8
SPK_INDEX  = 12
# ==========================

SIM_RATE = 8000
CHUNK_8K = 320
MIC_RATE = 44100

def at_cmd(ser, cmd, wait=0.2):
    ser.write((cmd + "\r\n").encode())
    time.sleep(wait)
    n = ser.in_waiting
    return ser.read(n) if n else b""

def resample_44k1_to_8k(pcm):
    x = np.frombuffer(pcm, dtype=np.int16)
    if x.size == 0:
        return b""
    out_len = int(x.size * SIM_RATE / MIC_RATE)
    if out_len <= 0:
        return b""
    idx = (np.linspace(0, x.size - 1, out_len)).astype(np.int32)
    return x[idx].astype(np.int16).tobytes()

def main():
    at_ser = serial.Serial(AT_PORT, 115200, timeout=0)
    au_ser = serial.Serial(AUDIO_PORT, 115200, timeout=0)

    # 探活
    while b"OK" not in at_cmd(at_ser, "AT", 0.05):
        pass
    print("AT OK")

    print("Waiting for incoming call...")

    # 等来电
    while True:
        if at_ser.in_waiting:
            r = at_ser.read(at_ser.in_waiting)
            if b"RING" in r:
                print("Incoming call")
                break
        time.sleep(0.1)

    # 接听
    at_cmd(at_ser, "ATA", 0.5)
    print("Call answered")

    # 等语音真正建立
    time.sleep(1.0)

    # === 关键语音配置 ===
    at_cmd(at_ser, "AT+CPCMREG=1", 0.1)
    # =====================

    pa = pyaudio.PyAudio()

    mic_info = pa.get_device_info_by_index(MIC_INDEX)
    mic_ch   = max(1, int(mic_info["maxInputChannels"]))
    mic_rate = int(mic_info["defaultSampleRate"])

    print(f"Call connected (Bluetooth HFP @ {mic_rate/1000:.1f}kHz)")
    print(">>> Press Ctrl+C to hang up <<<")

    # 下行播放
    spk = pa.open(
        format=pa.get_format_from_width(2),
        channels=1,
        rate=SIM_RATE,
        output=True,
        output_device_index=SPK_INDEX,
        frames_per_buffer=CHUNK_8K
    )

    frames_mic = int(mic_rate * 0.02)

    # 上行麦克风
    def mic_cb(in_data, frame_count, time_info, status):
        try:
            pcm = in_data
            if mic_ch > 1:
                arr = np.frombuffer(pcm, dtype=np.int16)
                arr = arr.reshape(-1, mic_ch).mean(axis=1).astype(np.int16)
                pcm = arr.tobytes()
            if mic_rate != SIM_RATE:
                pcm = resample_44k1_to_8k(pcm)
            au_ser.write(pcm)
        except:
            pass
        return (None, pyaudio.paContinue)

    mic = pa.open(
        format=pa.get_format_from_width(2),
        channels=mic_ch,
        rate=mic_rate,
        input=True,
        input_device_index=MIC_INDEX,
        frames_per_buffer=frames_mic,
        stream_callback=mic_cb
    )

    mic.start_stream()

    try:
        while True:
            if au_ser.in_waiting:
                data = au_ser.read(au_ser.in_waiting)
                print("DOWNLINK BYTES =", len(data))
                spk.write(data)
            time.sleep(0.01)

    except KeyboardInterrupt:
        print("Hang up")

    finally:
        at_cmd(at_ser, "AT+CHUP", 0.1)
        try: mic.stop_stream(); mic.close()
        except: pass
        try: spk.stop_stream(); spk.close()
        except: pass
        try: pa.terminate()
        except: pass
        at_ser.close()
        au_ser.close()

if __name__ == "__main__":
    main()

windows 实现导出音频流到电脑网页播放

<!doctype html>
<html>
<head>
<meta charset="utf-8">
<title>SIM7600 PCM 音频播放器(无 AT)</title>
<style>
body {font-family:sans-serif; max-width:800px; margin:20px auto;}
button {padding:8px 16px; margin:4px;}
#log {white-space:pre-wrap; background:#111; color:#0f0; padding:8px; height:150px; overflow:auto;}
</style>

	<style>
	.download-btn-container {
		position: fixed;
		bottom: 20px;
		left: 50%;
		transform: translateX(-50%);
		z-index: 1000;
	}
	.download-btn {
		background-color: #2196F3;
		color: white;
		padding: 10px 20px;
		border: none;
		border-radius: 5px;
		cursor: pointer;
		box-shadow: 0 2px 5px rgba(0,0,0,0.2);
		position: relative;
		padding-right: 35px;
	}
	.download-btn:hover {
		background-color: #1976D2;
	}
	.close-btn {
		background-color: #9e9e9e;
		color: white;
		width: 20px;
		height: 20px;
		border: none;
		border-radius: 50%;
		cursor: pointer;
		display: flex;
		align-items: center;
		justify-content: center;
		position: absolute;
		right: 8px;
		top: 50%;
		transform: translateY(-50%);
		font-size: 14px;
		line-height: 1;
	}
	.close-btn:hover {
		background-color: #757575;
	}
	.hidden {
		display: none !important;
	}
	</style>
	</head>
<body>

<h2>SIM7600 PCM 实时音频播放器</h2>

采样率:
<select id="rate">
  <option value="8000">8 kHz</option>
  <option value="16000">16 kHz</option>
</select>

<br><br>
<button id="openBtn">打开 PCM 串口并播放</button>
<button id="closeBtn" disabled>停止并断开</button>

<p>状态:<span id="status">未连接</span></p>
<div id="log"></div>

<script>
let port = null;
let reader = null;
let audioCtx = null;
let processor = null;
let running = false;

// 临时缓存,专门用于处理奇数字节情况
let leftover = null;

function log(msg) {
  let box = document.getElementById("log");
  let t = new Date().toLocaleTimeString();
  box.textContent += `[${t}] ${msg}\n`;
  box.scrollTop = box.scrollHeight;
}

document.getElementById("openBtn").onclick = async () => {
  try {
    const sampleRate = parseInt(document.getElementById("rate").value);

    port = await navigator.serial.requestPort();
    await port.open({ baudRate:115200 });

    audioCtx = new AudioContext({sampleRate});
    processor = audioCtx.createScriptProcessor(2048, 1, 1);

    // 环形缓冲区(float32)
    const bufferSize = sampleRate * 2; 
    const audioBuffer = new Float32Array(bufferSize);
    let writePtr = 0, readPtr = 0;

    processor.onaudioprocess = (e)=>{
      let output = e.outputBuffer.getChannelData(0);
      for (let i=0; i<output.length; i++) {
        if (readPtr !== writePtr) {
          output[i] = audioBuffer[readPtr];
          readPtr = (readPtr + 1) % bufferSize;
        } else {
          output[i] = 0;
        }
      }
    };
    processor.connect(audioCtx.destination);

    running = true;
    document.getElementById("openBtn").disabled = true;
    document.getElementById("closeBtn").disabled = false;
    document.getElementById("status").textContent = "已连接(等待音频)";
    log("串口已打开。");

    reader = port.readable.getReader();

    leftover = new Uint8Array(0);

    while (running) {
      const { value, done } = await reader.read();
      if (done) break;
      if (!value) continue;

      // 拼接 leftover(奇数字节时用)
      let bytes = new Uint8Array(leftover.length + value.length);
      bytes.set(leftover, 0);
      bytes.set(value, leftover.length);

      // 保证偶数字节(16bit 对齐)
      let alignedLength = bytes.length - (bytes.length % 2);
      let pcm = bytes.slice(0, alignedLength);
      leftover = bytes.slice(alignedLength);  

      const dv = new DataView(pcm.buffer, pcm.byteOffset, pcm.byteLength);
      const count = pcm.length / 2;

      for (let i=0;i<count;i++) {
        let s = dv.getInt16(i*2, true);
        let f = s / 32768;
        audioBuffer[writePtr] = f;
        writePtr = (writePtr + 1) % bufferSize;
      }
    }

  } catch(e) {
    console.error(e);
    log("错误:" + e);
    document.getElementById("status").textContent = "错误";
  }
};

document.getElementById("closeBtn").onclick = async () => {
  running = false;
  try { reader && await reader.cancel(); } catch(e){}
  try { port && await port.close(); } catch(e){}
  try { processor && processor.disconnect(); } catch(e){}
  try { audioCtx && await audioCtx.close(); } catch(e){}

  document.getElementById("openBtn").disabled = false;
  document.getElementById("closeBtn").disabled = true;
  document.getElementById("status").textContent = "未连接";
  log("已关闭。");
};
</script>


	<div class="download-btn-container" id="downloadContainer">
		<button class="download-btn" onclick="downloadHTML()">
			下载HTML文件
			<button class="close-btn" onclick="closeDownloadBtn(event)" title="关闭">×</button>
		</button>
	</div>
	<script>
	function downloadHTML() {
		window.location.href = '/html2images/download/' + window.location.pathname.split('/').pop();
	}
	function closeDownloadBtn(event) {
		event.stopPropagation(); // 阻止事件冒泡,防止触发下载
		document.getElementById('downloadContainer').classList.add('hidden');
	}
	// 检查是否为本地文件
	if (window.location.protocol === 'file:' || window.location.hostname !== 'api.xupeidong.cn') {
		document.getElementById('downloadContainer').classList.add('hidden');
	}
	</script>
	</body>
</html>

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐