【案例】Java使用ffmpeg实现rtsp视频流转hls、rtmp流
linux系统直接运行命令ubuntu:centos:windows系统下载包+配置环境变量环境变量配置就在path中加一个你下载下来的包解压后的一直到bin目录的路径地址。
·
1. 安装ffmpeg
linux系统直接运行命令
ubuntu:
sudo apt install ffmpeg
centos:
sudo yum install ffmpeg
windows系统下载包+配置环境变量
https://github.com/BtbN/FFmpeg-Builds/releases
https://github.com/BtbN/FFmpeg-Builds/releases
环境变量配置就在path中加一个你下载下来的包解压后的一直到bin目录的路径地址
2. 配置Nginx
1.1 配置 rtmp
需要与http同级
rtmp {
server {
listen 1935;
chunk_size 4096;
application live {
live on;
record off;
}
}
}
- listen:指定 rtmp 服务监听端口
- chunk_size:设置 rtmp 流的分块大小
- application live:定义一个 rtmp应用程序
- live on:启用实时流媒体功能
- record off:禁用录制功能(如需录制可以设为 record all)
- 这样就可以让 ffmpeg 将 rtsp 转为 rtmp 到 rtmp://your_server_ip/live/stream_key 地址上
1.2 配置hls
添加在http内部
server {
listen 1234;
server_name localhost;
location /hls/ {
alias /home/xxx/hls/;
add_header Cache-Control no-cache;
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'Origin, X-Requested-With, Content-Type, Accept';
add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range';
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'Origin, X-Requested-With, Content-Type, Accept';
add_header 'Access-Control-Max-Age' 1728000;
add_header 'Content-Type' 'text/plain; charset=utf-8';
add_header 'Content-Length' 0;
return 204;
}
}
}
- 使用 ffmpeg 将 rtsp 转为 hls 时需要一个文件夹来存储,这里的 /home/xxx/hls/ 就是你选择用来存储的文件夹
- 下面的 add_header 等,是为了解决前端请求流地址跨域问题的
- 这样前端就可以通过 http://your_server_ip:1234/hls/stream_key.m3u8 来访问流
3. 编写Java代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
@Service
@Slf4j
public class RTSPConverter {
private Process ffmpegProcess;
private Thread processMonitorThread;
private String ffmpegPath = "ffmpeg";
private String outputDir = "/home/xxx/hls";
public int startRTMPStreaming(String rtspUrl, String rtmpUrl) {
// FFmpeg 命令
String[] command = {
ffmpegPath,
"-i", rtspUrl,
"-c:v", "copy",
"-c:a", "aac",
"-f", "flv",
rtmpUrl
};
return startStreaming(command);
}
public int startHLSStreaming(String rtspUrl) {
// 确保输出目录存在
File outputDirFile = new File(outputDir);
if (!outputDirFile.exists()) {
outputDirFile.mkdirs();
}
// FFmpeg 命令
String[] command = {
ffmpegPath,
"-i", rtspUrl, // 输入 RTSP 流
"-c:v", "libx264", // 视频流设置为h.264,防止其他格式导致浏览器无法播放
"-an", // 禁用音频处理
"-f", "hls", // 输出格式为 HLS
"-hls_time", "10", // 每个 TS 文件的时长(秒)
"-hls_list_size", "6", // HLS 播放列表中保留的 TS 文件数量
"-hls_flags", "delete_segments", // 自动删除旧的 TS 文件
outputDir + "/" + "hlsStream" + ".m3u8"
};
return startStreaming(command);
}
private int startStreaming(String[] command) {
try {
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.inheritIO();
// 启动FFmpeg进程
ffmpegProcess = processBuilder.start();
// 创建监控线程处理进程结束
processMonitorThread = new Thread(() -> {
try {
ffmpegProcess.waitFor();
// 添加日志来记录FFmpeg进程的退出码
log.info("FFmpeg process exited with code: {}",exitCode);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
processMonitorThread.start();
return 1;
} catch (IOException e) {
e.printStackTrace();
return 0;
}
}
public int stopStreaming() {
if (ffmpegProcess != null && ffmpegProcess.isAlive()) {
// 销毁FFmpeg进程
ffmpegProcess.destroy();
try {
// 等待监控线程结束
if (processMonitorThread != null) {
processMonitorThread.join(5000); // 最多等待5秒
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return 0;
}
}
return 1;
}
}
- startRTMPStreaming:拼装 ffmpeg 将 rtsp 转 rtmp 的命令;
- startHLSStreaming:拼装 ffmpeg 将 rtsp 转 rtmp 的命令;
- startStreaming:开始运行 ffmpeg 执行命令;
- stopStreaming:停止运行 ffmpeg;
- 其中 "-c:v", "copy", 如果你要编码为 h.264 则需要将 copy 改为 libx264;
- "-an":禁用音频处理,开启的话有时候会导致 ffmpeg 报错;
4.更新优化
上面这种有个问题,由于每次结束请求都会调用停止ffmpeg命令,导致配置了ts文件超过8条删除也不会删除,我们需要增加一个手动删除机制,而且如果过早地将请求返回给前端,由于此时ts文件还未生成,前端如果没有足够的重复验证会导致播放失败;
4.1 增加线程资源管理机制
将线程统一交给线程池管理
private Process ffmpegProcess;
private ExecutorService readerThreadPool; // 用于管理ffmpeg输出流的读取线程
private Future<?> monitorTaskFuture; // 用于存储进程监控任务的Future
private final Object lock = new Object(); // 锁对象,用于同步对 ffmpegProcess、processMonitorThread 的访问
public RTSPToRTMPConverter(){
// 初始化一个固定大小的线程池
readerThreadPool = Executors.newFixedThreadPool(3);
}
/**
* 在 Spring 容器销毁 Bean 之前自动调用此方法,进行资源清理。
* 对应 Spring 的 @PreDestroy 生命周期回调。
*/
@PreDestroy
public void cleanupOnShutdown() {
log.info("@PreDestroy 触发:开始关闭FFmpeg转流服务及清理资源...");
stopStreaming(); // 停止所有可能的FFmpeg进程并清理文件
shutdownReaderThreadPool(); // 关闭 reader 线程池
log.info("FFmpeg转流服务资源清理完成。");
}
// 在应用程序关闭时调用,确保线程池关闭
public void shutdownReaderThreadPool() {
if (readerThreadPool != null && !readerThreadPool.isShutdown()) {
log.info("关闭FFmpeg读取线程池...");
readerThreadPool.shutdownNow(); // 尝试立即关闭所有任务
try {
if (!readerThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("FFmpeg读取线程池未能完全关闭或有任务未完成.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("关闭FFmpeg读取线程池时中断.", e);
}
}
}
4.2 调整 startHLSStreaming 方法
public int startHLSStreaming(String rtspUrl) {
// 确保输出目录存在
File outputDirFile = new File(outputDir);
if (!outputDirFile.exists()) {
outputDirFile.mkdirs();
}
// FFmpeg 命令
String[] command = {
ffmpegPath,
"-i", rtspUrl,
"-c:v", "libx264",
"-an", // 禁用音频处理
"-f", "hls",
"-hls_time", "3",
"-hls_list_size", "10",
"-hls_flags", "delete_segments",
outputDir + "/" + "hlsStream" + ".m3u8"
};
return startStreaming(command);
}
- ts文件每个的长度改为3秒,总数改为10条
4.3 调整 startStreaming 方法
增加了锁来防止多个ffmpeg进程同时启动;
private int startStreaming(String[] command) {
synchronized (lock) { // 同步访问共享变量
try {
// 如果ffmpegProcess已经存在且存活,先尝试停止它,避免启动多个
if (ffmpegProcess != null && ffmpegProcess.isAlive()) {
log.warn("检测到有FFmpeg进程正在运行. 尝试停止旧进程.");
stopStreaming(); // 尝试停止旧的进程
}
ProcessBuilder processBuilder = new ProcessBuilder(command);
log.info("尝试启动FFmpeg,命令: {}", String.join(" ", command));
ffmpegProcess = processBuilder.start();
// 使用线程池管理输出流读取线程
readerThreadPool.submit(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(ffmpegProcess.getErrorStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if (Thread.currentThread().isInterrupted()) { // 检查中断状态
log.debug("FFMPEG ERROR Reader Thread Interrupted.");
break;
}
// 区分进度信息和真正的错误
if (line.startsWith("frame=") && line.contains("time=") && line.contains("speed=")) {
log.debug("FFMPEG PROGRESS: {}", line);
} else {
log.error("FFMPEG ERROR: {}", line);
}
}
} catch (IOException e) {
log.debug("读取FFMPEG ERROR流时发生IO错误 (可能由于进程终止或中断): {}", e.getMessage());
}
});
// 提交任务到线程池读取FFmpeg的标准输出流
readerThreadPool.submit(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(ffmpegProcess.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
log.info("FFMPEG OUT: {}", line);
}
} catch (IOException e) {
log.error("读取FFMPEG OUT流时发生错误", e);
}
});
// 创建监控线程处理进程结束
monitorTaskFuture = readerThreadPool.submit(() -> {
try {
int exitCode = ffmpegProcess.waitFor();
if (exitCode != 0) {
log.error("FFmpeg 进程异常退出,退出码: {}", exitCode);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
log.warn("FFmpeg进程监控任务被中断.", e);
}
});
// --- 等待HLS文件生成逻辑 ---
long startTime = System.currentTimeMillis();
long timeout = 15 * 1000; // 设置一个超时时间,15秒
Path m3u8Path = Paths.get(outputDir, "hlsStream.m3u8");
Path ts0Path = Paths.get(outputDir, "hlsStream0.ts");
boolean m3u8Exists = false;
boolean tsFilesExist = false;
while (System.currentTimeMillis() - startTime < timeout) {
// 在等待文件生成前,检查FFmpeg进程是否已退出,避免长时间无效等待
if (ffmpegProcess != null && !ffmpegProcess.isAlive()) {
log.warn("FFmpeg进程在文件生成完成前已退出,退出码: {}", ffmpegProcess.exitValue());
return 0; // 进程已退出,文件可能不会生成
}
m3u8Exists = Files.exists(m3u8Path);
tsFilesExist = Files.exists(ts0Path);
if (m3u8Exists && tsFilesExist) {
log.info("HLS 流文件 (m3u8和至少两个TS文件) 已成功生成.");
return 1; // 成功,返回
}
// 每隔一段时间检查一次
TimeUnit.MILLISECONDS.sleep(200); // 稍微增加检查间隔
}
// 如果超时仍未生成所需文件
log.error("FFmpeg HLS 流文件生成超时。M3U8存在: {}, TS文件存在: {}", m3u8Exists, tsFilesExist);
stopStreaming(); // 尝试停止FFmpeg进程
return 0; // 失败
} catch (IOException e) {
log.error("启动FFmpeg进程失败。命令: {}", String.join(" ", command), e);
stopStreaming();
return 0;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("等待HLS文件生成时线程中断", e);
// 确保在中断时也尝试停止FFmpeg进程,防止资源泄露
stopStreaming();
return 0;
}
}
}
4.4 调整 stopStreaming 方法
public int stopStreaming() {
synchronized (lock) { // 同步访问共享变量
if (ffmpegProcess != null) {
if (ffmpegProcess.isAlive()) {
log.info("ffmpeg进程运行中,准备销毁");
// 取消监控任务
if (monitorTaskFuture != null){
// 任务如果正在运行,则关闭
monitorTaskFuture.cancel(true);
}
ffmpegProcess.destroy(); // 发送SIGTERM
try {
boolean terminated = ffmpegProcess.waitFor(5, TimeUnit.SECONDS);
if (!terminated) {
log.warn("ffmpeg进程未在5秒内正常终止,强制销毁");
ffmpegProcess.destroyForcibly(); // 强制终止
ffmpegProcess.waitFor(); // 等待强制终止完成
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("等待ffmpeg进程停止时中断", e);
return 0;
}
} else {
log.info("ffmpeg进程未处于活动状态 (可能已自行退出).");
}
ffmpegProcess = null; // 清除引用
} else {
log.info("没有ffmpeg进程在运行 (ffmpegProcess 为 null).");
}
// 清理监控线程
monitorTaskFuture = null;
// --- HLS片段清理逻辑 ---
log.info("开始清理HLS流片段: {}", outputDir);
File hlsDir = new File(outputDir);
if (hlsDir.exists() && hlsDir.isDirectory()) {
try (Stream<Path> walk = Files.walk(hlsDir.toPath())) {
// 过滤出 hlsStream*.ts 和 hlsStream.m3u8 文件
walk.filter(p -> {
String fileName = p.getFileName().toString();
return fileName.startsWith("hlsStream") && (fileName.endsWith(".ts") || fileName.endsWith(".m3u8"));
})
.forEach(p -> {
try {
Files.delete(p);
log.info("已删除HLS片段: {}", p.getFileName());
} catch (IOException e) {
log.warn("未能删除HLS片段: {}. 错误: {}", p.getFileName(), e.getMessage());
}
});
} catch (IOException e) {
log.error("遍历HLS目录时发生错误", e);
}
}
return 1;
}
}更多推荐
所有评论(0)