带标签:

# python3 myinfer.py \
#   --wav_scp "/data/q3_8phone_1600.wav.scp" \
#   --output_text "./ppc_fineture2.txt" \
#   --device "cuda" \
#   --ngpu 1 \
#   --ncpu 4


import argparse
import soundfile
import os
import re  # 处理空格/Tab混合分隔
from funasr import AutoModel


def parse_args():
    parser = argparse.ArgumentParser()
    # 模型配置参数(保持原逻辑,适配不同环境)
    parser.add_argument("--asr_model_online", type=str, 
                        default="/data/.../1_code_model/funasr/FunASR/examples/industrial_data_pretraining/sense_voice/outputs_finetune2",
                        help="Model name (from ModelScope) or local model path")
    parser.add_argument("--asr_model_online_revision", type=str, default="v2.0.4", help="Model revision")
    # 硬件配置参数
    parser.add_argument("--ngpu", type=int, default=1, help="0 for CPU, ≥1 for GPU")
    parser.add_argument("--device", type=str, default="cuda", help="Device: cuda / cpu")
    parser.add_argument("--ncpu", type=int, default=4, help="CPU cores for preprocessing")
    # 新增:输入输出配置(核心修改)
    parser.add_argument("--wav_scp", type=str, required=True, help="Input wav.scp path (col1: id, col2: audio path, sep: space/Tab)")
    parser.add_argument("--output_text", type=str, required=True, help="Output text path (col1: id, col2: transcript)")
    return parser.parse_args()


def load_wav_scp(wav_scp_path):
    """解析wav.scp文件,返回(id, audio_path)列表(支持空格/Tab分隔,跳过空行/注释)"""
    scp_items = []
    with open(wav_scp_path, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            # 跳过空行和注释行(#开头)
            if not line or line.startswith("#"):
                continue
            # 分割:支持空格/Tab混合,仅分割1次(确保第二列路径完整)
            try:
                parts = re.split(r'[\t ]+', line, maxsplit=1)
                if len(parts) < 2:
                    print(f"警告:第{line_num}行格式错误(缺少音频路径),跳过 -> {line}")
                    continue
                audio_id, audio_path = parts[0], parts[1]
                # 检查音频路径是否存在
                if not os.path.exists(audio_path):
                    print(f"警告:第{line_num}行音频路径不存在,跳过 -> {audio_path}")
                    continue
                scp_items.append((audio_id, audio_path))
            except Exception as e:
                print(f"警告:第{line_num}行解析失败,跳过 -> {line},错误:{e}")
    print(f"成功解析wav.scp:共{len(scp_items)}个有效音频")
    return scp_items


def batch_infer_from_scp(scp_items, model):
    """批量推理:输入(id, audio_path)列表,返回(id, transcript)列表"""
    results = []
    # 推理参数(保持原逻辑,可根据需求调整)
    chunk_size = [0, 10, 5]  # 600ms chunk
    encoder_chunk_look_back = 4
    decoder_chunk_look_back = 1
    chunk_stride = chunk_size[1] * 960  # 600ms stride(16k采样率:960=16000*0.06)

    for audio_id, audio_path in scp_items:
        print(f"正在推理:{audio_id} -> {audio_path}")
        try:
            # 读取音频(FunASR默认支持16k采样率,若其他采样率需重采样,此处保持原逻辑)
            speech, sample_rate = soundfile.read(audio_path)
            # 分chunk推理
            cache = {}
            total_chunk_num = int((len(speech) - 1) / chunk_stride) + 1
            res_txt = []
            for i in range(total_chunk_num):
                speech_chunk = speech[i * chunk_stride : (i + 1) * chunk_stride]
                is_final = (i == total_chunk_num - 1)
                # 模型推理
                res = model.generate(
                    input=speech_chunk,
                    cache=cache,
                    is_final=is_final,
                    chunk_size=chunk_size,
                    encoder_chunk_look_back=encoder_chunk_look_back,
                    decoder_chunk_look_back=decoder_chunk_look_back
                )
                res_txt.append(res[0]["text"])
            # 拼接最终结果
            final_transcript = "".join(res_txt).strip()
            results.append((audio_id, final_transcript))
            print(f"推理完成:{audio_id} -> {final_transcript}")
        except Exception as e:
            # 捕获异常,避免脚本中断,标记推理失败
            error_msg = f"inference_failed: {str(e)[:50]}"  # 截取部分错误信息
            results.append((audio_id, error_msg))
            print(f"推理失败:{audio_id} -> {error_msg}")
    return results


def save_text_file(results, output_text_path):
    """保存结果到text文件(格式:id  transcript)"""
    with open(output_text_path, "w", encoding="utf-8") as f:
        for audio_id, transcript in results:
            f.write(f"{audio_id} {transcript}\n")
    print(f"\n结果已保存到:{output_text_path}")


def main():
    args = parse_args()
    # 1. 初始化FunASR模型(保持原逻辑,提示用户修改模型路径)
    print("正在初始化FunASR模型...")
    model = AutoModel(
        model=args.asr_model_online,
        model_revision=args.asr_model_online_revision,
        ngpu=args.ngpu,
        ncpu=args.ncpu,
        device=args.device,
        disable_pbar=True,
        disable_log=True,
        disable_update=True
    )
    # 2. 解析wav.scp
    scp_items = load_wav_scp(args.wav_scp)
    if not scp_items:
        print("错误:未解析到有效音频,退出脚本")
        return
    # 3. 批量推理
    results = batch_infer_from_scp(scp_items, model)
    # 4. 保存输出text文件
    save_text_file(results, args.output_text)


if __name__ == "__main__":
    main()
    

不带标签:

# python3 myinfer.py \
#   --wav_scp "/data/q3_8phone_1600.wav.scp" \
#   --output_text "./ppc_fineture2_2.txt" \
#   --device "cuda" \
#   --ngpu 1 \
#   --ncpu 4


import argparse
import soundfile
import os
import re  # 处理空格/Tab分隔 + 过滤特殊标签
from funasr import AutoModel


def parse_args():
    parser = argparse.ArgumentParser()
    # 模型配置(你的本地微调模型路径,无需修改)
    parser.add_argument("--asr_model_online", type=str, 
                        default="/data/。。。/1_code_model/funasr/FunASR/examples/industrial_data_pretraining/sense_voice/outputs_finetune2",
                        help="Model name (from ModelScope) or local model path")
    parser.add_argument("--asr_model_online_revision", type=str, default="v2.0.4", help="Model revision")
    # 硬件配置
    parser.add_argument("--ngpu", type=int, default=1, help="0 for CPU, ≥1 for GPU")
    parser.add_argument("--device", type=str, default="cuda", help="Device: cuda / cpu")
    parser.add_argument("--ncpu", type=int, default=4, help="CPU cores for preprocessing")
    # 输入输出
    parser.add_argument("--wav_scp", type=str, required=True, help="Input wav.scp path (col1: id, col2: audio path)")
    parser.add_argument("--output_text", type=str, required=True, help="Output text path (col1: id, col2: pure transcript)")
    return parser.parse_args()


def load_wav_scp(wav_scp_path):
    """解析wav.scp文件,返回(id, audio_path)列表"""
    scp_items = []
    with open(wav_scp_path, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            try:
                parts = re.split(r'[\t ]+', line, maxsplit=1)
                if len(parts) < 2:
                    print(f"警告:第{line_num}行格式错误,跳过 -> {line}")
                    continue
                audio_id, audio_path = parts[0], parts[1]
                if not os.path.exists(audio_path):
                    print(f"警告:第{line_num}行音频路径不存在,跳过 -> {audio_path}")
                    continue
                scp_items.append((audio_id, audio_path))
            except Exception as e:
                print(f"警告:第{line_num}行解析失败,跳过 -> {line},错误:{e}")
    print(f"成功解析wav.scp:共{len(scp_items)}个有效音频")
    return scp_items


def remove_special_tags(text):
    """核心函数:过滤所有 <|*|> 格式的特殊标签,返回纯文本"""
    # 正则匹配规则:匹配 <| 开头、|> 结尾的所有内容(非贪婪匹配,避免多标签连用时漏删)
    tag_pattern = r'<\|.*?\|>'
    # 替换标签为空字符串,最后去除首尾空格(避免标签删除后留空)
    pure_text = re.sub(tag_pattern, "", text).strip()
    # 处理标签删除后可能出现的连续空格(可选,根据需求保留)
    pure_text = re.sub(r'\s+', ' ', pure_text)
    return pure_text


def batch_infer_from_scp(scp_items, model):
    """批量推理:新增标签过滤,返回纯文本结果"""
    results = []
    chunk_size = [0, 10, 5]
    encoder_chunk_look_back = 4
    decoder_chunk_look_back = 1
    chunk_stride = chunk_size[1] * 960  # 600ms stride

    for audio_id, audio_path in scp_items:
        print(f"正在推理:{audio_id} -> {audio_path}")
        try:
            speech, sample_rate = soundfile.read(audio_path)
            cache = {}
            total_chunk_num = int((len(speech) - 1) / chunk_stride) + 1
            res_txt = []
            for i in range(total_chunk_num):
                speech_chunk = speech[i * chunk_stride : (i + 1) * chunk_stride]
                is_final = (i == total_chunk_num - 1)
                res = model.generate(
                    input=speech_chunk,
                    cache=cache,
                    is_final=is_final,
                    chunk_size=chunk_size,
                    encoder_chunk_look_back=encoder_chunk_look_back,
                    decoder_chunk_look_back=decoder_chunk_look_back
                )
                res_txt.append(res[0]["text"])
            
            # 步骤1:拼接原始结果
            raw_transcript = "".join(res_txt).strip()
            # 步骤2:过滤特殊标签(核心修改,得到纯文本)
            pure_transcript = remove_special_tags(raw_transcript)
            # 步骤3:处理空结果(避免标签删除后文本为空)
            if not pure_transcript:
                pure_transcript = "[empty_transcript]"
            
            results.append((audio_id, pure_transcript))
            print(f"推理完成:{audio_id} -> {pure_transcript}")
        except Exception as e:
            error_msg = f"inference_failed: {str(e)[:50]}"
            results.append((audio_id, error_msg))
            print(f"推理失败:{audio_id} -> {error_msg}")
    return results


def save_text_file(results, output_text_path):
    """保存纯文本结果到text文件"""
    with open(output_text_path, "w", encoding="utf-8") as f:
        for audio_id, transcript in results:
            f.write(f"{audio_id} {transcript}\n")
    print(f"\n纯文本结果已保存到:{output_text_path}")


def main():
    args = parse_args()
    print("正在初始化FunASR模型...")
    model = AutoModel(
        model=args.asr_model_online,
        model_revision=args.asr_model_online_revision,
        ngpu=args.ngpu,
        ncpu=args.ncpu,
        device=args.device,
        disable_pbar=True,
        disable_log=True,
        disable_update=True
    )
    scp_items = load_wav_scp(args.wav_scp)
    if not scp_items:
        print("错误:未解析到有效音频,退出脚本")
        return
    results = batch_infer_from_scp(scp_items, model)
    save_text_file(results, args.output_text)


if __name__ == "__main__":
    main()
    
    

另外原始推理脚本:

本地:

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Copyright FunASR (https://github.com/FunAudioLLM/SenseVoice). All Rights Reserved.
#  MIT License  (https://opensource.org/licenses/MIT)

import sys
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"  # 强制仅可见 GPU 0
os.environ["WORLD_SIZE"] = "1"
import torch
import argparse
from funasr import AutoModel
from funasr.utils.postprocess_utils import rich_transcription_postprocess
from functools import partial
from tqdm import tqdm

def init_model():
    """初始化单 GPU 模型"""
    global model
    print(f"Using GPU: {os.environ['CUDA_VISIBLE_DEVICES']}")
    
    # 初始化模型
    model_dir = "/data/。。。/1_code_model/funasr/FunASR/examples/industrial_data_pretraining/sense_voice/outputs_finetune2"
    model = AutoModel(
        model=model_dir,
        trust_remote_code=True,
        vad_model="fsmn-vad",
        vad_kwargs={"max_single_segment_time": 30000},
        device="cuda:0",  # 强制使用 GPU 0
        disable_update=True
    )

def process_audio(line, output_file):
    """处理单行输入数据并写入到输出文件"""
    try:
        id_, audio_path = line.strip().split(' ', 1)
        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"Audio file not found: {audio_path}")

        # 生成音频识别文本
        res = model.generate(
            input=audio_path,
            cache={},
#             language="auto",  # "zh", "en", "yue", "ja", "ko", "nospeech"
            language="auto",  # "zh", "en", "yue", "ja", "ko", "nospeech"
            use_itn=True,
#             use_itn=False,
            batch_size_s=80,
            merge_vad=True,
            merge_length_s=15,
            ban_emo_unk=False,
        )
        text = rich_transcription_postprocess(res[0]["text"])

        # 结果直接追加到输出文件中
        with open(output_file, 'a', encoding='utf-8') as f_out:
            f_out.write(f"{id_} {text}\n")

        print(f"Processed audio: {audio_path} | Result: {text}")
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")

def main(input_file, output_file):
    with open(input_file, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    # 单进程处理
    init_model()
    for line in tqdm(lines, desc="Processing audios"):
        process_audio(line, output_file)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run voice recognition using single GPU.")
    parser.add_argument('input_file', type=str, help='Path to input file containing audio paths and IDs.')
    parser.add_argument('output_file', type=str, help='Path to output file for recognized texts.')
    args = parser.parse_args()

    # 清空输出文件
    open(args.output_file, 'w').close()

    main(args.input_file, args.output_file)
    



线上:

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Copyright FunASR (https://github.com/FunAudioLLM/SenseVoice). All Rights Reserved.
#  MIT License  (https://opensource.org/licenses/MIT)

import sys
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"  # 强制仅可见 GPU 0
os.environ["WORLD_SIZE"] = "1"
import torch
import argparse
from funasr import AutoModel
from funasr.utils.postprocess_utils import rich_transcription_postprocess
from functools import partial
from tqdm import tqdm

def init_model():
    """初始化单 GPU 模型"""
    global model
    print(f"Using GPU: {os.environ['CUDA_VISIBLE_DEVICES']}")
    
    # 初始化模型
    model_dir = "iic/SenseVoiceSmall"
    model = AutoModel(
        model=model_dir,
        trust_remote_code=True,
        vad_model="fsmn-vad",
        vad_kwargs={"max_single_segment_time": 30000},
        device="cuda:0",  # 强制使用 GPU 0
    )

def process_audio(line, output_file):
    """处理单行输入数据并写入到输出文件"""
    try:
        id_, audio_path = line.strip().split(' ', 1)
        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"Audio file not found: {audio_path}")

        # 生成音频识别文本
        res = model.generate(
            input=audio_path,
            cache={},
#             language="auto",  # "zh", "en", "yue", "ja", "ko", "nospeech"
            language="auto",  # "zh", "en", "yue", "ja", "ko", "nospeech"
            use_itn=True,
#             use_itn=False,
            batch_size_s=80,
            merge_vad=True,
            merge_length_s=15,
            ban_emo_unk=False,
        )
        text = rich_transcription_postprocess(res[0]["text"])

        # 结果直接追加到输出文件中
        with open(output_file, 'a', encoding='utf-8') as f_out:
            f_out.write(f"{id_} {text}\n")

        print(f"Processed audio: {audio_path} | Result: {text}")
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")

def main(input_file, output_file):
    with open(input_file, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    # 单进程处理
    init_model()
    for line in tqdm(lines, desc="Processing audios"):
        process_audio(line, output_file)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run voice recognition using single GPU.")
    parser.add_argument('input_file', type=str, help='Path to input file containing audio paths and IDs.')
    parser.add_argument('output_file', type=str, help='Path to output file for recognized texts.')
    args = parser.parse_args()

    # 清空输出文件
    open(args.output_file, 'w').close()

    main(args.input_file, args.output_file)
    


 

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐