大家好,我是烤鸭:

​ 上次写过一篇有关的文章,https://blog.csdn.net/Angry_Mills/article/details/146299355。用的是win系统部署的2B模型,这次打算写一篇linux部署7B模型,并且提供完整的使用代码。支持传入图片、视频,内置识别和分镜的prompt。

思路

识别图片或者视频如何使用,在哪些场景比较适合。这里说一个可能应用的场景,如果你有素材,无论是自己拍摄的还是网络上找到的,怎么快速变成自己风格的内容。

具体点的案例,做一个主题是某个热点新闻(苏超之类的)的某红书的图文或者视频内容。

按照以往的思路。

  • 写prompt,让大模型生成脚本

  • 搜索资源,去网络上找图片或者视频

  • 根据脚本匹配资源,生成内容

  • 发布到各个平台

利用媒体识别模型,简化匹配资源/生成内容的部分。当然,使用工作流和其他模型,可以完成整个流程的自动化。不过生成内容的成本和质量需要评估,感觉目前市场上不具备批量生产的能力。

流程

在这里插入图片描述

环境

linux ubuntu

python 3.13

显卡是4090 24G * 3

Qwen/Qwen2.5-VL-7B-Instruct

依赖

requirements.txt

torch
modelscope
transformers
requests
uvicorn
fastapi
pydantic
qwen_vl_utils
torchvision
accelerate
wheel

由于使用了 flash_attention_2,安装特别麻烦,下载总是超时,可以参考这个文章,先下载后本地安装:

https://blog.csdn.net/chongch_wang/article/details/136542877

源码

主要实现了单例模型,并发调用模型方法。

媒体类型是图片/视频,视频的prompt分成了分镜和描述。

from modelscope import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch
import gc
import time
from logging_config import KAFKA_LOGGER
from typing import Optional
import torch.multiprocessing as mp

# 全局配置(根据GPU显存调整)
MODEL_NAME = "Qwen/Qwen2.5-VL-7B-Instruct"
BASE_MAX_CONCURRENT = 2 # 基础并发数(压力测试基准值)

class ModelInferenceContext:
    # 静态变量实现模型共享(单例模式)
    _shared_model: Optional[Qwen2_5_VLForConditionalGeneration] = None
    _model_lock = mp.Lock()  # 多进程/线程安全锁
    _current_concurrent = 0  # 当前并发计数(仅用于限流)
    _last_used_time: float = 0.0  # 新增:最后一次使用时间(初始化为0)
    MODEL_IDLE_TIMEOUT = 300  # 新增:模型空闲超时时间(单位秒,默认5分钟)
    def __enter__(self):
        # 1. 并发限流检查(避免超出GPU负载)
        with self._model_lock:
            if self._current_concurrent >= BASE_MAX_CONCURRENT:
                raise RuntimeError(f"当前并发数已达上限{BASE_MAX_CONCURRENT},请等待")
            self._current_concurrent += 1
            # 2. 进入上下文时更新最后使用时间
            self._last_used_time = time.time()

        # 2. 加载或复用模型(仅首次触发加载)
        self._load_model()
        return self._shared_model

    def _load_model(self):
        """线程安全的模型加载逻辑(仅首次执行)"""
        with self._model_lock:
            if self._shared_model is None:  # 仅当模型未加载时执行
                KAFKA_LOGGER.info("开始加载模型...")
                # 显式指定设备(避免auto动态分配导致冲突)
                self._shared_model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
                    MODEL_NAME,
                    torch_dtype=torch.bfloat16,
                    attn_implementation="flash_attention_2",
                    device_map="auto"  # 单卡固定设备,多卡可改为device_map=[0,1]
                ).eval()  # 强制推理模式(关闭Dropout等训练层)
                torch.cuda.empty_cache()  # 加载后清理临时缓存
                KAFKA_LOGGER.info("模型加载完成!")

    def __exit__(self, exc_type, exc_val, exc_tb):
        with self._model_lock:
            self._current_concurrent -= 1  # 仅减少并发计数,不销毁模型
            # 4. 退出上下文时更新最后使用时间
            self._last_used_time = time.time()
            # 5. 新增:无并发且超时未使用时释放模型(避免显存长期占用)
            if (self._current_concurrent == 0
                    and self._shared_model is not None
                    and (time.time() - self._last_used_time) > self.MODEL_IDLE_TIMEOUT):
                KAFKA_LOGGER.info("模型长时间未使用,释放显存")
                del self._shared_model
                self._shared_model = None  # 重置为None,下次使用时重新加载
                gc.collect()  # 清理Python内存
                torch.cuda.empty_cache()  # 清理CUDA显存

        # 异常时记录详细日志(配合CUDA_LAUNCH_BLOCKING定位问题)
        if exc_type is not None:
            err_msg = f"推理异常: {exc_type.__name__} - {exc_val}"
            KAFKA_LOGGER.error(err_msg)
            raise Exception(err_msg)

    def _force_release_model(self):
        """强制释放模型并清理显存"""
        if self._shared_model is not None:
            del self._shared_model
            self._shared_model = None
            gc.collect()
            torch.cuda.empty_cache()
            KAFKA_LOGGER.info("模型已释放,显存清理完成")

# 预加载处理器(全局复用)
processor = AutoProcessor.from_pretrained(MODEL_NAME)

def process_media(url: str, media_type: str, parse_model: str = "default") -> str:
    """处理媒体链接并返回解析后的JSON结果(优化版)"""
    start_time_all = time.time()
    inputs = None  # 显式声明输入变量,避免finally块未定义

    try:
        # 1. 输入合法性校验
        KAFKA_LOGGER.info(f"process_media start 1.0:url={url}, type={media_type}, model={parse_model}")
        supported_types = ["image", "video"]
        if media_type not in supported_types:
            err_msg = f"不支持的媒体类型: {media_type},仅支持{supported_types}"
            KAFKA_LOGGER.error(err_msg)
            raise Exception(err_msg)

        KAFKA_LOGGER.info(f"process_media start 2.0:url={url}, type={media_type}, model={parse_model}")
        # 2. 构建输入内容及提示词
        content = {"type": media_type, media_type: url}
        user_prompt = _build_prompt(media_type, parse_model)

        KAFKA_LOGGER.info(f"process_media start 3.0:url={url}, type={media_type}, model={parse_model}")
        # 3. 预处理输入(带校验)
        messages = [{"role": "user", "content": [content, {"type": "text", "text": user_prompt.strip()}]}]
        text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        image_inputs, video_inputs = process_vision_info(messages)

        KAFKA_LOGGER.info(f"process_media start 4.0:url={url}, type={media_type}, model={parse_model}")
        # 输入张量构建(显式指定设备)
        inputs = processor(
            text=[text],
            images=image_inputs,
            videos=video_inputs,
            padding=True,
            return_tensors="pt"
        ).to("cuda" if torch.cuda.is_available() else "cpu")

        KAFKA_LOGGER.info(f"process_media start 5.0:url={url}, type={media_type}, model={parse_model}")
        # 4. 模型推理(带显存优化)
        with ModelInferenceContext() as model:
            with torch.no_grad():
                generated_ids = model.generate(**inputs, max_new_tokens=4096)
                generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]
                output_text = processor.batch_decode(
                    generated_ids_trimmed,
                    skip_special_tokens=True,
                    clean_up_tokenization_spaces=False
                )

        KAFKA_LOGGER.info(f"process_media start 6.0:url={url}, type={media_type}, model={parse_model}")
        # 5. 结果处理
        result = output_text[0].replace('\\n', ' ').replace('  ', '')
        KAFKA_LOGGER.info(f"url: {url}, cost:{time.time()-start_time_all:.2f}s, result:{result[:100]}...")
        return result

    except RuntimeError as e:
        err_msg = f"并发限流或显存不足: {str(e)}"
        KAFKA_LOGGER.error(err_msg)
        raise Exception(err_msg)
    except Exception as e:
        err_msg = f"处理失败: {str(e)}"
        KAFKA_LOGGER.error(err_msg)
        raise Exception(err_msg)
    finally:
        # 显式释放输入张量
        if inputs is not None:
            del inputs
        gc.collect()
        torch.cuda.empty_cache()

def _build_prompt(media_type: str, parse_model: str) -> str:
    """构建用户提示词(按类型分离逻辑)"""
    if media_type == "image":
        return "请使用中文详细描述这张图片的内容。"
    elif media_type == "video":
        if parse_model == "slice":
            return """
            你是一个视频内容分析专家。请根据视频内容生成详细的分镜头描述,要求:
            1. 每个分镜头包括时间范围(如00:01-00:03)和内容描述;
            2. 描述简洁准确,涵盖画面主体、动作或事件;
            3. 以JSON数组格式输出,示例:[{"time":"00:00-00:02","desc":"紫色车行驶"}]
            """
        else:
            return "请使用中文详细描述这个视频的内容。"

if __name__ == "__main__":
    import threading

    def test_task(url):
        result = process_media(url, "video", "desc")
        print(f"任务结果: {result[:100]}...")

    urls = [
        "https://vc14-al1-pl-agv.autohome.com.cn/video-36/E3BD4E39114FD258/2024-03-30/F5DB035773E23A736F15C4841F4F2CE2-400-wm.mp4?key=8CB6EF506C2C0E2CF1E5A3206BA2DDFE&time=1748938336"
    ]
    for url in urls:
        t = threading.Thread(target=test_task, args=(url,))
        t.start()

测试结果

描述结果没什么问题,基本都描述出来了。不过分镜结果有点差强人意,20s的视频结果显示是32s,可能跟prompt写的有关系,不过不建议用,不如直接用ffmpeg来分镜。

耗时方面,20s的视频单线程使用的时候差不多要30-40s。

参考页面地址:

https://v.autohome.com.cn/v-2244454.html

描述结果:
这段视频展示了一场汽车性能测试,对比了两款车型:小米SU7和特斯拉Model S。视频开始时,展示了两辆停在赛道上的汽车,一辆是蓝色的,另一辆是红色的。赛道上设置了多个交通锥,用于测试车辆的操控性和加速能力。

接下来的画面中,两名赛车手穿着防护装备,包括头盔和手套,准备驾驶各自的车辆。他们坐在车内,专注地等待发令。

随后,视频切换到赛道上,两辆车分别从起点出发,进行加速测试。可以看到,赛道两侧有观众和其他工作人员在观看比赛。背景中可以看到一些树木和建筑物,天气晴朗,阳光明媚。

在加速过程中,视频通过慢动作镜头捕捉到了车辆的动态,显示了它们的速度和性能。最后,视频显示了两辆车完成加速测试后的成绩,小米SU7的成绩为3.2秒,特斯拉Model S的成绩为2.95秒。这些数据表明特斯拉Model S在0-100公里/小时的加速测试中表现更优。

整个视频通过详细的镜头切换和清晰的字幕,向观众展示了这两款车在赛道上的表现,并提供了具体的加速时间对比。
分镜结果:
[{
	"time": "00:00-00:02",
	"desc": "一辆红色的特斯拉Model S停在赛道上,旁边有一辆蓝色的小米SU7。赛道上有橙色的交通锥。"
}, {
	"time": "00:02-00:04",
	"desc": "两名穿着赛车服和头盔的人坐在车内,准备驾驶。"
}, {
	"time": "00:04-00:06",
	"desc": "赛道上的观众站在一旁观看比赛。"
}, {
	"time": "00:06-00:10",
	"desc": "两辆车从起点线出发,开始绕过赛道上的交通锥。"
}, {
	"time": "00:10-00:12",
	"desc": "赛道上的观众继续观看比赛。"
}, {
	"time": "00:12-00:14",
	"desc": "车内视角显示驾驶员正在操作方向盘。"
}, {
	"time": "00:14-00:16",
	"desc": "一名身穿黄色安全背心的工作人员在赛道上指挥。"
}, {
	"time": "00:16-00:18",
	"desc": "两辆车继续绕过赛道上的交通锥。"
}, {
	"time": "00:18-00:20",
	"desc": "赛道上的观众继续观看比赛。"
}, {
	"time": "00:20-00:22",
	"desc": "两辆车完成绕行后,速度表显示小米SU7用时3.2秒,特斯拉Model S用时2.95秒。"
}, {
	"time": "00:22-00:24",
	"desc": "赛道上的观众继续观看比赛。"
}, {
	"time": "00:24-00:26",
	"desc": "两辆车再次绕过赛道上的交通锥。"
}, {
	"time": "00:26-00:28",
	"desc": "赛道上的观众继续观看比赛。"
}, {
	"time": "00:28-00:30",
	"desc": "两辆车完成绕行后,速度表显示小米SU7用时3.17秒,特斯拉Model S用时2.84秒。"
}, {
	"time": "00:30-00:32",
	"desc": "赛道上的观众继续观看比赛。"
}]

后续处理

需要对每个分镜片段进行关键词提取,相当于一个预打标的过程,再将打标结果进行向量化存储。

这个是为了后期根据指定文本进行检索最合适的片段使用的。但是片段本身还需要进行复杂的清理。

比如过滤文字过多的、或者第一人称视角等。

在这里插入图片描述

贴一个合成后的视频展示:

https://live.csdn.net/v/485594

总结

这其实是对上次的文章做的一个补充。

展示了下模型在实际场景的使用。当然Qwen-Vl 模型是其中的一环。针对整个应用,肯定还有更好的实现方式。

后续可以把应用中其他用到的模型专门写一篇文章介绍一下使用场景。

文章参考

https://blog.csdn.net/Angry_Mills/article/details/146299355

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐