1. 问题描述

目前做的一个功能是上传多个文件,这个上传文件是整体功能的一部分,文件在上传的过程中需要轮询文件上传的状态的接口,还有要调用整个任务状态的接口,因为我们的Mysql是单例的,有多个大文件上传的时候会出现任务状态不一致的问题,经过调研有WebSocket与SSE两种方式,但是我只需要把信息推给前端,所以使用个更轻量的SSE。

2. SSE介绍

SSE(Server-Sent Events) 是 HTML5 标准中的一项技术,它允许服务器通过单向连接持续地将数据推送给客户端(通常是浏览器)。

  • 与传统的 HTTP 请求响应不同,SSE 是一种服务器主动推送数据的机制。
  • 与 WebSocket 不同,它是单向通信:服务器 -> 客户端。

SSE 使用标准的 HTTP 协议 和 文本/event-stream 格式,通常用于实时消息推送,如股票行情、社交动态通知、在线状态更新等。

2.1 SSE 的工作原理

  1. 客户端通过普通的 HTTP 请求发起连接:
GET /events HTTP/1.1
Accept: text/event-stream
  1. 服务器响应:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
  1. 事件流
event: message
id: 123
data: 这是要推送的数据
  1. 客户端自动处理每一条数据,并在连接断开时自动重连。

2.2 SSE 的事件格式规范

每条消息由一组字段组成,字段以冒号: 分隔,每条消息之间由两个换行符 \n\n 分隔:
在这里插入图片描述

示例:

id: 1001
event: update
data: {"progress": 50}

2.3 SSE与其他技术对比

在这里插入图片描述

2.4 SSE 的优缺点

✅ 优点:

  • 使用标准 HTTP 协议,兼容性好;
  • 实现简单,开发成本低;
  • 支持自动重连机制;
  • 支持事件类型、多行数据等;
  • 适用于需要实时更新、但无需客户端发送大量消息的场景。

❌ 缺点:

  • 只支持单向通信;
  • 不支持二进制数据;
  • 不支持所有浏览器(如 IE);
  • 有并发连接数限制(部分浏览器每个域名默认最大连接数限制);
  • 需配置好心跳机制、防止代理服务器中断连接。

3. 实战代码

@GetMapping("/file-upload-status")
@Operation(summary = "文件上传状态SSE")
public SseEmitter fileUploadStatus(@RequestParam("taskCode") String taskCode,
                                  @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
log.info("文件上传状态SSE, 请求参数: {}, Last-Event-ID: {}", taskCode, lastEventId);

try {
   return dataConfigService.fileUploadStatus(taskCode, lastEventId);
} catch (Exception e) {
 log.error("文件上传状态SSE,失败,错误信息:{}", e.getMessage());
  throw e;
   }
}





/**
     * 用于存储每个任务对应的 SseEmitter 实例。
     * key 为任务唯一标识 taskCode,value 为与该任务关联的 SseEmitter。
     * 使用 ConcurrentHashMap 是为了支持多线程环境下的并发读写,确保线程安全。
     * <p>
     * 场景:
     * - 客户端发起 SSE 连接时,将对应的 SseEmitter 存入此 Map。
     * - 后台定时任务或上传进度更新时可以通过 taskCode 拿到对应的 emitter 推送数据。
     * - 当连接断开(如超时、关闭、出错)时,从 Map 中移除对应的 emitter,避免内存泄漏。
     */
    private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();

    /**
     * 定时任务线程池,用于定期推送每个任务的上传状态到前端客户端(通过 SseEmitter)。
     * 使用 Runtime.getRuntime().availableProcessors() 获取当前机器的可用处理器核心数,
     * 并以此设置线程池大小,合理利用系统资源。
     * <p>
     * 特点:
     * - ScheduledExecutorService 支持定时或周期性任务调度,适合做周期性状态推送。
     * - 使用线程池而不是单线程可以支持多个任务并发状态推送。
     * - 可根据系统并发量和任务复杂度调整线程池大小。
     */
    private final ScheduledExecutorService scheduledExecutorService =
            Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    /**
     * 缓存最近一次推送的状态(用于断线续传)
     */
    private final Map<String, DataFileUploadStatusVO> lastEventDataMap = new ConcurrentHashMap<>();
@Override
    public SseEmi
    tter fileUploadStatus(String taskCode, String lastEventId) {
        // 检查任务代码是否为空
        if (StringUtils.isBlank(taskCode)) {
            throw new ServiceException(400, "请求参数不能为空");
        }

        // 创建 SseEmitter 实例,设置超时时间为 1 小时(单位:毫秒)
        SseEmitter emitter = new SseEmitter(60 * 60 * 1000L);

        // 将当前任务的 emitter 注册到全局 emitterMap 中,用于后续状态推送
        emitterMap.put(taskCode, emitter);

        // 定义清理逻辑(连接关闭时自动移除 emitter,避免内存泄漏)
        Runnable cleanup = () -> {
            emitterMap.remove(taskCode);
            lastEventDataMap.remove(taskCode);
        };

        // 注册回调:客户端主动关闭连接时调用
        emitter.onCompletion(cleanup);

        // 注册回调:连接超时后自动关闭并清理
        emitter.onTimeout(cleanup);

        // 注册回调:出现异常时进行日志记录并清理资源
        emitter.onError(e -> {
            log.error("文件上传状态SSE 错误,taskCode:{}, 错误信息:{}", taskCode, e.getMessage(), e);
            cleanup.run();
        });

        // 尝试断点续传(只在客户端提供 Last-Event-ID 的情况下)
        if (StringUtils.isNotBlank(lastEventId)) {
            DataFileUploadStatusVO lastData = lastEventDataMap.get(taskCode);
            if (lastData != null) {
                try {
                    emitter.send(SseEmitter.event()
                            .id(lastEventId)
                            .name("upload-status")
                            .reconnectTime(5000L)
                            .data(lastData, MediaType.APPLICATION_JSON));
                    log.info("文件上传状态SSE,断点续传成功,taskCode:{}, lastEventId:{}, 数据:{}", taskCode, lastEventId,
                            JSON.toJSONString(lastData));
                } catch (IOException e) {
                    log.error("文件上传状态SSE,断点续传失败,taskCode:{}, 错误:{}", taskCode, e.getMessage(), e);
                }
            }
        }

        // 创建一个原子容器用于持有定时任务引用,以便后续取消任务
        AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();

        // 启动定时任务
        ScheduledFuture<?> future = scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                // 从全局映射中获取当前连接的 emitter,若为空说明连接已关闭,停止执行
                SseEmitter currentEmitter = emitterMap.get(taskCode);
                if (currentEmitter == null) {
                    return;
                }

                // 查询文件上传状态
                List<DataConfigRespVO> dataConfigList = getDataConfigList(taskCode);

                // 查询任务信息
                TaskRespVO taskRespVO = taskService.getTask(taskCode);

                // 将数据库实体转换为状态对象列表
                List<DataFileUploadStatusVO.FileStatus> fileStatusList = dataConfigList.stream()
                        .map(item -> {
                            DataFileUploadStatusVO.FileStatus fileStatus = new DataFileUploadStatusVO.FileStatus();
                            fileStatus.setId(item.getId());
                            fileStatus.setCode(item.getCode());
                            fileStatus.setImportMethod(item.getImportMethod());
                            fileStatus.setFilename(item.getFilename());
                            fileStatus.setFileUrl(item.getFileUrl());
                            fileStatus.setSize(item.getSize());
                            fileStatus.setUploadStatusCode(item.getUploadStatusCode());
                            return fileStatus;
                        })
                        .collect(Collectors.toList());

                // 构建完整状态返回对象
                DataFileUploadStatusVO statusVO = new DataFileUploadStatusVO();
                statusVO.setTaskCode(taskCode);
                statusVO.setCurrentTime(TimeUtil.getCurrentTime());
                statusVO.setFileStatusList(fileStatusList);
                statusVO.setName(taskRespVO.getName());
                statusVO.setDataType(taskRespVO.getDataType());
                statusVO.setDataTypeStr(taskRespVO.getDataTypeStr());
                statusVO.setImportType(taskRespVO.getImportType());
                statusVO.setCurrentLinkCode(taskRespVO.getCurrentLinkCode());
                statusVO.setCurrentLink(taskRespVO.getCurrentLink());
                statusVO.setCurrentLinkStatusCode(taskRespVO.getCurrentLinkStatusCode());
                statusVO.setCurrentLinkStatus(taskRespVO.getCurrentLinkStatus());

                String eventId = String.valueOf(System.currentTimeMillis());

                // 推送状态
                try {
                    // 设置状态
                    statusVO.setFinished(fileUploadFished(statusVO));

                    // 保存最后一次推送的数据,用于断线续传
                    lastEventDataMap.put(taskCode, statusVO);

                    currentEmitter.send(SseEmitter.event()
                            // 事件 ID,供断线续传
                            .id(eventId)
                            // 事件名
                            .name("upload-status")
                            // 告诉客户端:断线后5秒再重连
                            .reconnectTime(5000L)
                            // 推送数据为 JSON
                            .data(statusVO, MediaType.APPLICATION_JSON));

                    log.info("文件上传状态SSE,当前时间:{},taskCode:{},推送数据:{}", TimeUtil.getCurrentTime(), taskCode,
                            JSON.toJSONString(statusVO));
                } catch (IOException ioException) {
                    // 客户端断开连接或传输异常,主动清理资源并中止定时任务
                    log.error("文件上传状态SSE,错误:{},taskCode:{}", ioException.getMessage(), taskCode);
                    currentEmitter.completeWithError(ioException);
                    cleanup.run();
                    futureRef.get().cancel(true);
                    return;
                }

                // 判断任务是否完成
                if (fileUploadFished(statusVO)) {
                    // 设置任务完成
                    statusVO.setFinished(true);

                    currentEmitter.send(SseEmitter.event()
                            .name("upload-status")
                            .data(statusVO, MediaType.APPLICATION_JSON));
                    // 主动关闭连接
                    currentEmitter.complete();
                    // 清理资源并取消定时任务
                    cleanup.run();
                    futureRef.get().cancel(true);
                    log.info("文件上传状态SSE,任务已完成,taskCode:{},当前时间:{},emitterMap:{},大小:{}", taskCode,
                            TimeUtil.getCurrentTime(), JSON.toJSONString(emitterMap), emitterMap.size());
                }

            } catch (Exception e) {
                log.error("文件上传状态SSE, SSE推送失败,taskCode:{},错误信息:{}", taskCode, e.getMessage(), e);
                SseEmitter failedEmitter = emitterMap.get(taskCode);
                if (failedEmitter != null) {
                    failedEmitter.completeWithError(e);
                }
                cleanup.run();
                futureRef.get().cancel(true);
            }
            // 每2秒执行一次
        }, 0, 2, TimeUnit.SECONDS);

        // 记录定时任务引用
        futureRef.set(future);

        // 返回 emitter 给前端,保持连接
        return emitter;
    }
event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:27","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}

event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:29","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}

event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:31","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}

event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:33","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}

event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}

event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}


Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐