sensevoice_small专有名词针对性微调-推理评估部分
【代码】sensevoice_small专有名词针对性微调-推理评估部分。
·
带标签:
# python3 myinfer.py \
# --wav_scp "/data/q3_8phone_1600.wav.scp" \
# --output_text "./ppc_fineture2.txt" \
# --device "cuda" \
# --ngpu 1 \
# --ncpu 4
import argparse
import soundfile
import os
import re # 处理空格/Tab混合分隔
from funasr import AutoModel
def parse_args():
parser = argparse.ArgumentParser()
# 模型配置参数(保持原逻辑,适配不同环境)
parser.add_argument("--asr_model_online", type=str,
default="/data/.../1_code_model/funasr/FunASR/examples/industrial_data_pretraining/sense_voice/outputs_finetune2",
help="Model name (from ModelScope) or local model path")
parser.add_argument("--asr_model_online_revision", type=str, default="v2.0.4", help="Model revision")
# 硬件配置参数
parser.add_argument("--ngpu", type=int, default=1, help="0 for CPU, ≥1 for GPU")
parser.add_argument("--device", type=str, default="cuda", help="Device: cuda / cpu")
parser.add_argument("--ncpu", type=int, default=4, help="CPU cores for preprocessing")
# 新增:输入输出配置(核心修改)
parser.add_argument("--wav_scp", type=str, required=True, help="Input wav.scp path (col1: id, col2: audio path, sep: space/Tab)")
parser.add_argument("--output_text", type=str, required=True, help="Output text path (col1: id, col2: transcript)")
return parser.parse_args()
def load_wav_scp(wav_scp_path):
"""解析wav.scp文件,返回(id, audio_path)列表(支持空格/Tab分隔,跳过空行/注释)"""
scp_items = []
with open(wav_scp_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# 跳过空行和注释行(#开头)
if not line or line.startswith("#"):
continue
# 分割:支持空格/Tab混合,仅分割1次(确保第二列路径完整)
try:
parts = re.split(r'[\t ]+', line, maxsplit=1)
if len(parts) < 2:
print(f"警告:第{line_num}行格式错误(缺少音频路径),跳过 -> {line}")
continue
audio_id, audio_path = parts[0], parts[1]
# 检查音频路径是否存在
if not os.path.exists(audio_path):
print(f"警告:第{line_num}行音频路径不存在,跳过 -> {audio_path}")
continue
scp_items.append((audio_id, audio_path))
except Exception as e:
print(f"警告:第{line_num}行解析失败,跳过 -> {line},错误:{e}")
print(f"成功解析wav.scp:共{len(scp_items)}个有效音频")
return scp_items
def batch_infer_from_scp(scp_items, model):
"""批量推理:输入(id, audio_path)列表,返回(id, transcript)列表"""
results = []
# 推理参数(保持原逻辑,可根据需求调整)
chunk_size = [0, 10, 5] # 600ms chunk
encoder_chunk_look_back = 4
decoder_chunk_look_back = 1
chunk_stride = chunk_size[1] * 960 # 600ms stride(16k采样率:960=16000*0.06)
for audio_id, audio_path in scp_items:
print(f"正在推理:{audio_id} -> {audio_path}")
try:
# 读取音频(FunASR默认支持16k采样率,若其他采样率需重采样,此处保持原逻辑)
speech, sample_rate = soundfile.read(audio_path)
# 分chunk推理
cache = {}
total_chunk_num = int((len(speech) - 1) / chunk_stride) + 1
res_txt = []
for i in range(total_chunk_num):
speech_chunk = speech[i * chunk_stride : (i + 1) * chunk_stride]
is_final = (i == total_chunk_num - 1)
# 模型推理
res = model.generate(
input=speech_chunk,
cache=cache,
is_final=is_final,
chunk_size=chunk_size,
encoder_chunk_look_back=encoder_chunk_look_back,
decoder_chunk_look_back=decoder_chunk_look_back
)
res_txt.append(res[0]["text"])
# 拼接最终结果
final_transcript = "".join(res_txt).strip()
results.append((audio_id, final_transcript))
print(f"推理完成:{audio_id} -> {final_transcript}")
except Exception as e:
# 捕获异常,避免脚本中断,标记推理失败
error_msg = f"inference_failed: {str(e)[:50]}" # 截取部分错误信息
results.append((audio_id, error_msg))
print(f"推理失败:{audio_id} -> {error_msg}")
return results
def save_text_file(results, output_text_path):
"""保存结果到text文件(格式:id transcript)"""
with open(output_text_path, "w", encoding="utf-8") as f:
for audio_id, transcript in results:
f.write(f"{audio_id} {transcript}\n")
print(f"\n结果已保存到:{output_text_path}")
def main():
args = parse_args()
# 1. 初始化FunASR模型(保持原逻辑,提示用户修改模型路径)
print("正在初始化FunASR模型...")
model = AutoModel(
model=args.asr_model_online,
model_revision=args.asr_model_online_revision,
ngpu=args.ngpu,
ncpu=args.ncpu,
device=args.device,
disable_pbar=True,
disable_log=True,
disable_update=True
)
# 2. 解析wav.scp
scp_items = load_wav_scp(args.wav_scp)
if not scp_items:
print("错误:未解析到有效音频,退出脚本")
return
# 3. 批量推理
results = batch_infer_from_scp(scp_items, model)
# 4. 保存输出text文件
save_text_file(results, args.output_text)
if __name__ == "__main__":
main()
不带标签:
# python3 myinfer.py \
# --wav_scp "/data/q3_8phone_1600.wav.scp" \
# --output_text "./ppc_fineture2_2.txt" \
# --device "cuda" \
# --ngpu 1 \
# --ncpu 4
import argparse
import soundfile
import os
import re # 处理空格/Tab分隔 + 过滤特殊标签
from funasr import AutoModel
def parse_args():
parser = argparse.ArgumentParser()
# 模型配置(你的本地微调模型路径,无需修改)
parser.add_argument("--asr_model_online", type=str,
default="/data/。。。/1_code_model/funasr/FunASR/examples/industrial_data_pretraining/sense_voice/outputs_finetune2",
help="Model name (from ModelScope) or local model path")
parser.add_argument("--asr_model_online_revision", type=str, default="v2.0.4", help="Model revision")
# 硬件配置
parser.add_argument("--ngpu", type=int, default=1, help="0 for CPU, ≥1 for GPU")
parser.add_argument("--device", type=str, default="cuda", help="Device: cuda / cpu")
parser.add_argument("--ncpu", type=int, default=4, help="CPU cores for preprocessing")
# 输入输出
parser.add_argument("--wav_scp", type=str, required=True, help="Input wav.scp path (col1: id, col2: audio path)")
parser.add_argument("--output_text", type=str, required=True, help="Output text path (col1: id, col2: pure transcript)")
return parser.parse_args()
def load_wav_scp(wav_scp_path):
"""解析wav.scp文件,返回(id, audio_path)列表"""
scp_items = []
with open(wav_scp_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
parts = re.split(r'[\t ]+', line, maxsplit=1)
if len(parts) < 2:
print(f"警告:第{line_num}行格式错误,跳过 -> {line}")
continue
audio_id, audio_path = parts[0], parts[1]
if not os.path.exists(audio_path):
print(f"警告:第{line_num}行音频路径不存在,跳过 -> {audio_path}")
continue
scp_items.append((audio_id, audio_path))
except Exception as e:
print(f"警告:第{line_num}行解析失败,跳过 -> {line},错误:{e}")
print(f"成功解析wav.scp:共{len(scp_items)}个有效音频")
return scp_items
def remove_special_tags(text):
"""核心函数:过滤所有 <|*|> 格式的特殊标签,返回纯文本"""
# 正则匹配规则:匹配 <| 开头、|> 结尾的所有内容(非贪婪匹配,避免多标签连用时漏删)
tag_pattern = r'<\|.*?\|>'
# 替换标签为空字符串,最后去除首尾空格(避免标签删除后留空)
pure_text = re.sub(tag_pattern, "", text).strip()
# 处理标签删除后可能出现的连续空格(可选,根据需求保留)
pure_text = re.sub(r'\s+', ' ', pure_text)
return pure_text
def batch_infer_from_scp(scp_items, model):
"""批量推理:新增标签过滤,返回纯文本结果"""
results = []
chunk_size = [0, 10, 5]
encoder_chunk_look_back = 4
decoder_chunk_look_back = 1
chunk_stride = chunk_size[1] * 960 # 600ms stride
for audio_id, audio_path in scp_items:
print(f"正在推理:{audio_id} -> {audio_path}")
try:
speech, sample_rate = soundfile.read(audio_path)
cache = {}
total_chunk_num = int((len(speech) - 1) / chunk_stride) + 1
res_txt = []
for i in range(total_chunk_num):
speech_chunk = speech[i * chunk_stride : (i + 1) * chunk_stride]
is_final = (i == total_chunk_num - 1)
res = model.generate(
input=speech_chunk,
cache=cache,
is_final=is_final,
chunk_size=chunk_size,
encoder_chunk_look_back=encoder_chunk_look_back,
decoder_chunk_look_back=decoder_chunk_look_back
)
res_txt.append(res[0]["text"])
# 步骤1:拼接原始结果
raw_transcript = "".join(res_txt).strip()
# 步骤2:过滤特殊标签(核心修改,得到纯文本)
pure_transcript = remove_special_tags(raw_transcript)
# 步骤3:处理空结果(避免标签删除后文本为空)
if not pure_transcript:
pure_transcript = "[empty_transcript]"
results.append((audio_id, pure_transcript))
print(f"推理完成:{audio_id} -> {pure_transcript}")
except Exception as e:
error_msg = f"inference_failed: {str(e)[:50]}"
results.append((audio_id, error_msg))
print(f"推理失败:{audio_id} -> {error_msg}")
return results
def save_text_file(results, output_text_path):
"""保存纯文本结果到text文件"""
with open(output_text_path, "w", encoding="utf-8") as f:
for audio_id, transcript in results:
f.write(f"{audio_id} {transcript}\n")
print(f"\n纯文本结果已保存到:{output_text_path}")
def main():
args = parse_args()
print("正在初始化FunASR模型...")
model = AutoModel(
model=args.asr_model_online,
model_revision=args.asr_model_online_revision,
ngpu=args.ngpu,
ncpu=args.ncpu,
device=args.device,
disable_pbar=True,
disable_log=True,
disable_update=True
)
scp_items = load_wav_scp(args.wav_scp)
if not scp_items:
print("错误:未解析到有效音频,退出脚本")
return
results = batch_infer_from_scp(scp_items, model)
save_text_file(results, args.output_text)
if __name__ == "__main__":
main()
另外原始推理脚本:
本地:
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Copyright FunASR (https://github.com/FunAudioLLM/SenseVoice). All Rights Reserved.
# MIT License (https://opensource.org/licenses/MIT)
import sys
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0" # 强制仅可见 GPU 0
os.environ["WORLD_SIZE"] = "1"
import torch
import argparse
from funasr import AutoModel
from funasr.utils.postprocess_utils import rich_transcription_postprocess
from functools import partial
from tqdm import tqdm
def init_model():
"""初始化单 GPU 模型"""
global model
print(f"Using GPU: {os.environ['CUDA_VISIBLE_DEVICES']}")
# 初始化模型
model_dir = "/data/。。。/1_code_model/funasr/FunASR/examples/industrial_data_pretraining/sense_voice/outputs_finetune2"
model = AutoModel(
model=model_dir,
trust_remote_code=True,
vad_model="fsmn-vad",
vad_kwargs={"max_single_segment_time": 30000},
device="cuda:0", # 强制使用 GPU 0
disable_update=True
)
def process_audio(line, output_file):
"""处理单行输入数据并写入到输出文件"""
try:
id_, audio_path = line.strip().split(' ', 1)
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
# 生成音频识别文本
res = model.generate(
input=audio_path,
cache={},
# language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech"
language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech"
use_itn=True,
# use_itn=False,
batch_size_s=80,
merge_vad=True,
merge_length_s=15,
ban_emo_unk=False,
)
text = rich_transcription_postprocess(res[0]["text"])
# 结果直接追加到输出文件中
with open(output_file, 'a', encoding='utf-8') as f_out:
f_out.write(f"{id_} {text}\n")
print(f"Processed audio: {audio_path} | Result: {text}")
except Exception as e:
print(f"Error processing {audio_path}: {e}")
def main(input_file, output_file):
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 单进程处理
init_model()
for line in tqdm(lines, desc="Processing audios"):
process_audio(line, output_file)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run voice recognition using single GPU.")
parser.add_argument('input_file', type=str, help='Path to input file containing audio paths and IDs.')
parser.add_argument('output_file', type=str, help='Path to output file for recognized texts.')
args = parser.parse_args()
# 清空输出文件
open(args.output_file, 'w').close()
main(args.input_file, args.output_file)
线上:
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Copyright FunASR (https://github.com/FunAudioLLM/SenseVoice). All Rights Reserved.
# MIT License (https://opensource.org/licenses/MIT)
import sys
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0" # 强制仅可见 GPU 0
os.environ["WORLD_SIZE"] = "1"
import torch
import argparse
from funasr import AutoModel
from funasr.utils.postprocess_utils import rich_transcription_postprocess
from functools import partial
from tqdm import tqdm
def init_model():
"""初始化单 GPU 模型"""
global model
print(f"Using GPU: {os.environ['CUDA_VISIBLE_DEVICES']}")
# 初始化模型
model_dir = "iic/SenseVoiceSmall"
model = AutoModel(
model=model_dir,
trust_remote_code=True,
vad_model="fsmn-vad",
vad_kwargs={"max_single_segment_time": 30000},
device="cuda:0", # 强制使用 GPU 0
)
def process_audio(line, output_file):
"""处理单行输入数据并写入到输出文件"""
try:
id_, audio_path = line.strip().split(' ', 1)
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
# 生成音频识别文本
res = model.generate(
input=audio_path,
cache={},
# language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech"
language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech"
use_itn=True,
# use_itn=False,
batch_size_s=80,
merge_vad=True,
merge_length_s=15,
ban_emo_unk=False,
)
text = rich_transcription_postprocess(res[0]["text"])
# 结果直接追加到输出文件中
with open(output_file, 'a', encoding='utf-8') as f_out:
f_out.write(f"{id_} {text}\n")
print(f"Processed audio: {audio_path} | Result: {text}")
except Exception as e:
print(f"Error processing {audio_path}: {e}")
def main(input_file, output_file):
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 单进程处理
init_model()
for line in tqdm(lines, desc="Processing audios"):
process_audio(line, output_file)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run voice recognition using single GPU.")
parser.add_argument('input_file', type=str, help='Path to input file containing audio paths and IDs.')
parser.add_argument('output_file', type=str, help='Path to output file for recognized texts.')
args = parser.parse_args()
# 清空输出文件
open(args.output_file, 'w').close()
main(args.input_file, args.output_file)
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)