目录

一、业务场景介绍

二、分析

(一)该分析哪些数据?

(二)该怎样进行分析?

(三)该什么时候进行分析?

三、核心技术栈

四、具体流程图

五、具体代码实现

(一)实体类(这里只展示用到的字段)

(二)es定时任务

(三)日志分析定时任务

(四)service层


一、业务场景介绍

有一个类似美团的点餐小程序,平台入驻了大量商家店铺。用户访问店铺时,系统会记录进店日志。

当前业务需求:

在首页设置一个个性化推荐模块,为用户精准推荐4家店铺

已知条件:

每月产生约10万条进店日志数据。请设计实现该需求。


二、分析

(一)该分析哪些数据?

实现个性化推荐时,若仅分析短期日志数据,精度难以保证。然而,考虑到每月约10万条日志的累积量,数据库中长期存储的日志数据量级将十分庞大。若对全量历史数据进行实时分析,时间和资源开销将难以承受。

因此,需要确定一个合理的数据分析区间。我们选择最近30天(一个月) 的日志作为分析对象。对于餐饮场景,一个月的用户行为数据通常足以反映近期偏好,且10万量级的数据在可处理范围内。

即使只查询一个月的数据,在存储了数十万乃至数百万条日志的数据库中进行复杂分析,仍会对数据库造成巨大压力并导致响应延迟。为此,需实施冷热数据分离策略:将超过一个月的冷数据归档存储至传统数据库(如MySQL),而将一个月内的热数据存储于高性能搜索引擎(如Elasticsearch)。这样可显著提升查询效率。该方案需要配套一个每日执行的定时任务,用于数据的冷热迁移与同步。

(二)该怎样进行分析?

为提升推荐精度,需进行多维度分析。我们计划考察用户对特定店铺的访问频率、访问店铺的类型偏好以及常访问店铺的地理位置分布等特征。

核心算法:

采用加权评分排序法。具体而言,为上述关键维度(如具体店铺访问频次、类型偏好度、位置偏好度)赋予不同权重,综合计算每个候选店铺对目标用户的推荐得分。最终根据得分对所有店铺进行倒序排序,选取排名前4的店铺作为推荐结果。

(三)该什么时候进行分析?

日志数据量庞大,分析过程耗时较长,因此分析执行的时机至关重要。

  • 方案一(实时分析):
    用户每次进入首页时触发分析。此方案虽能保证推荐实时性,但会对系统性能造成极大负担(高并发时尤为明显),且单次访问产生的新日志数据量极小,对推荐结果的即时影响微乎其微,性价比较低。
  • 方案二(周期性分析 - 如半小时/小时): 
    定期执行分析任务。相比实时分析减轻了部分压力,但仍存在频繁计算的开销问题,且未能充分利用餐饮场景的流量特征。
  • 优化方案(基于场景特征): 
    考虑到点餐小程序用户访问存在明显的高峰时段(早、中、晚三餐时段),可尝试按餐别对店铺进行分类(如早餐店、午餐店、晚餐店)。在高峰之间的流量低谷时段(例如上午10点、下午3点、晚上9点后)分别执行分析任务,并主要推荐对应时段的店铺类型。此方案一天执行三次,能较好地平衡时效性与性能开销。
  • 最终选择(每日分析): 
    鉴于当前平台店铺类型丰富度有限,按餐别分类执行的效果可能受限。因此,我们决定采用每日执行一次分析任务的方案。该方案在保证推荐结果每日更新的基础上,有效控制了计算频率和资源消耗。此方案同样需要一个定时任务来驱动日志分析过程。

三、核心技术栈

由于我们的分析都是依靠于定时任务完成的,所以这里我们要用到spring task,然后冷热数据分离存储需要用到ElasticSearch和MySQL,最后数据缓存需要用到redis。


四、具体流程图

然后当用户端调用获取推荐店铺的接口时,直接查询redis是否有数据,若有则直接返回,反之则采取备用方案(随机4个热门店铺)返回。


五、具体代码实现

(一)实体类(这里只展示用到的字段)

进店日志类:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EnterStoreUserLog implements Serializable {

    @TableId(value = "id")
    private Long id;

    @Schema(description = "店铺id")
    private Long storeId;

    @Schema(description = "店铺所属分区id")
    private Integer categoryId;

    @Schema(description = "店铺类型分类id")
    private Integer storeClassificationId;

    @Schema(description = "用户id")
    private Long userId;

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @TableField(value = "create_time",fill = FieldFill.INSERT)
    private LocalDateTime createTime;


}

为了节省es的存储空间,这里需要再新建一个实体类,只提取核心字段:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EnterStoreUserLogUsedTask implements Serializable {

    @Schema(description = "店铺id")
    private Long storeId;

    @Schema(description = "店铺所属分区id")
    private Integer categoryId;

    @Schema(description = "店铺类型分类id")
    private Integer storeClassificationId;

    @Schema(description = "用户id")
    private Long userId;

    @Schema(description = "创建时间时间戳")
    @TableField(value = "create_time",fill = FieldFill.INSERT)
    private Long createTime;
}

为了存储每个店铺对应的得分,我们也需要再新建一个实体类:

/**
 * @author Yilena
 * 用作进店日志得分排序的封装
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class StoreScore {
    @Schema(description = "店铺id")
    private Long storeId;

    @Schema(description = "店铺VO实体")
    private StoreVO storeVO;

    @Schema(description = "得分")
    private Double score;
}

店铺类以及其VO类则不做展示,因为不怎么重要。

(二)es定时任务

/**
 * @author Yilena
 * 定时任务,每天1:10执行一次
 * 将30天内的进店日志全部移动到es做热数据表
 */
@Slf4j
@RequiredArgsConstructor
@Component
public class ESHotAndColdDataSeparationTask {

    private final RestHighLevelClient restHighLevelClient;
    private final EnterStoreUserLogMapper enterStoreUserLogMapper;
    private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");

    @Scheduled(cron = "0 10 1 * * ?")
    public void processHotAndColdDataSeparation() {
        LocalDateTime start = LocalDateTime.now();
        log.info("ES热数据表和冷数据表分离定时任务开始执行,执行时间:{}", start );

        // 创建获取索引请求,使用通配符匹配索引名
        GetIndexRequest getIndexRequest = new GetIndexRequest();
        getIndexRequest.indices(ESIndexConstant.ENTER_STORE_USER_LOG + "_*");

        boolean isCatch = false;
        String[] indices = new String[0];
        try {
            // 执行请求并获取匹配的索引名称数组
            indices = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
        } catch (Exception e) {
            log.error("查询ES进店日志索引列表失败,时间: {}", LocalDateTime.now(), e);
            // 第一次失败后重试
            try {
                Thread.sleep(2000);
                indices = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
            } catch (InterruptedException | IOException ex) {
                log.error("再次查询ES进店日志索引列表失败,时间: {}", LocalDateTime.now(), ex);
                isCatch = true;
            }
        }

        // 获取匹配的索引数量
        int count = indices.length;

        // 如果小于30说明数据出错或者是第一次添加,则需要删除所有数据再插入数据
        if(isCatch || count < 30){
            deleteAllDataAndInsertData();
        }
        // 如果大于30则走正常逻辑
        else{
            // 删除30天前那一天的最旧的全部日志数据
            LocalDate oldestDay = LocalDate.now().minusDays(30);// 因为当前时间是1:10,所以要多减去1天
            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_" + formatter.format(oldestDay));
            deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery());
            try {
                restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                // 删除失败也没事,这里不影响整体业务,所以不需要终止运行
                log.error("删除es进店日志索引库最旧数据失败,失败时间:{}", LocalDateTime.now(),e);
            }

            // 这个任务执行的时候是1:10执行,所以这里需要新增的是当前时间加24h以内的日志数据
            LocalDateTime startOfDay = LocalDate.now().minusDays(1).atStartOfDay();
            LocalDateTime endOfDay = startOfDay.plusDays(1);
            LambdaQueryWrapper<EnterStoreUserLog> queryWrapper = Wrappers.lambdaQuery(EnterStoreUserLog.class)
                    .ge(EnterStoreUserLog::getCreateTime, startOfDay)
                    .lt(EnterStoreUserLog::getCreateTime, endOfDay);
            List<EnterStoreUserLog> enterStoreUserLogs = enterStoreUserLogMapper.selectList(queryWrapper);
            List<EnterStoreUserLogUsedTask> enterStoreUserLogUsedTasks = enterStoreUserLogs.stream().map(enterStoreUserLog -> {
                EnterStoreUserLogUsedTask enterStoreUserLogUsedTask = new EnterStoreUserLogUsedTask();
                enterStoreUserLogUsedTask.setStoreId(enterStoreUserLog.getStoreId());
                enterStoreUserLogUsedTask.setCategoryId(enterStoreUserLog.getCategoryId());
                enterStoreUserLogUsedTask.setStoreClassificationId(enterStoreUserLog.getStoreClassificationId());
                enterStoreUserLogUsedTask.setUserId(enterStoreUserLog.getUserId());
                enterStoreUserLogUsedTask.setCreateTime(enterStoreUserLog.getCreateTime().toInstant(ZoneOffset.UTC).toEpochMilli());
                return enterStoreUserLogUsedTask;
            }).toList();
            if(!enterStoreUserLogUsedTasks.isEmpty()){
                BulkRequest bulkRequest = new BulkRequest();
                for (EnterStoreUserLogUsedTask enterStoreUserLogUsedTask : enterStoreUserLogUsedTasks) {
                    IndexRequest request = new IndexRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_" + formatter.format(LocalDate.now().minusDays(1)))
                            .id(enterStoreUserLogUsedTask.getUserId() + "_" + formatter.format(LocalDateTime.ofInstant(
                                    Instant.ofEpochMilli(enterStoreUserLogUsedTask.getCreateTime()),
                                    ZoneOffset.UTC
                            )))
                            .source(JSONUtil.toJsonStr(enterStoreUserLogUsedTask), XContentType.JSON);
                    bulkRequest.add(request);
                }

                try {
                    restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    log.error("插入近30天的数据到es失败,失败时间:{}", LocalDateTime.now(),e);
                }
            }

            log.info("ES热数据表和冷数据表分离定时任务执行完毕,结束时间:{},花费时间:{}s", LocalDateTime.now(), Duration.between(start, LocalDateTime.now()).getSeconds());
        }
    }

    private void deleteAllDataAndInsertData() throws RuntimeException {
       // 删除索引库所有数据
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_*");
        deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery());
        try {
            restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            // 如果这里还出错的话就直接抛出异常,阻止程序向下执行
            throw new RuntimeException("删除es进店日志索引库所有数据失败,失败时间:" + LocalDateTime.now());
        }

        // 插入近30天的数据
        // 先从db中查数据
        // 原本使用分页查询,因为sharding-JDBC不支持,所以这里采用游标查询
        LocalDateTime startTime = LocalDateTime.now().minusDays(30);
        int pageSize = 1000;
        Long lastId = null;

        LambdaQueryWrapper<EnterStoreUserLog> queryWrapper = Wrappers.lambdaQuery(EnterStoreUserLog.class)
                .ge(EnterStoreUserLog::getCreateTime,startTime)
                .orderByAsc(EnterStoreUserLog::getId);
        while (true) {
            // 每次循环创建新Wrapper, 防止多次叠加查询条件
            LambdaQueryWrapper<EnterStoreUserLog> currentWrapper = queryWrapper.clone();

            // 添加分页条件
            if (lastId != null) {
                currentWrapper.gt(EnterStoreUserLog::getId, lastId);
            }
            currentWrapper.last("LIMIT " + pageSize);

            // 查询数据
            List<EnterStoreUserLog> records = enterStoreUserLogMapper.selectList(queryWrapper);

            if (records.isEmpty()) break;

            List<EnterStoreUserLogUsedTask> convertedList = records.stream()
                    .map(log -> {
                        EnterStoreUserLogUsedTask task = new EnterStoreUserLogUsedTask();
                        task.setStoreId(log.getStoreId());
                        task.setCategoryId(log.getCategoryId());
                        task.setStoreClassificationId(log.getStoreClassificationId());
                        task.setUserId(log.getUserId());
                        task.setCreateTime(log.getCreateTime().toInstant(ZoneOffset.UTC).toEpochMilli());
                        return task;
                    })
                    .toList();


            BulkRequest bulkRequest = new BulkRequest();

            // 批量插入es
            for (EnterStoreUserLogUsedTask enterStoreUserLogUsedTask : convertedList) {
                LocalDate date = LocalDateTime.ofInstant(
                        Instant.ofEpochMilli(enterStoreUserLogUsedTask.getCreateTime()),
                        ZoneOffset.UTC
                ).toLocalDate();
                IndexRequest request = new IndexRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_" + formatter.format(date))
                        .id(enterStoreUserLogUsedTask.getUserId() + "_" + formatter.format(LocalDateTime.ofInstant(
                                Instant.ofEpochMilli(enterStoreUserLogUsedTask.getCreateTime()),
                                ZoneOffset.UTC
                        )))
                        .source(JSONUtil.toJsonStr(enterStoreUserLogUsedTask), XContentType.JSON);
                bulkRequest.add(request);
            }

            try {
                restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                // 如果这里还出错的话就直接抛出异常,阻止程序向下执行
                throw new RuntimeException("插入近30天的数据到es失败,失败时间:" + LocalDateTime.now());
            }

            lastId = records.get(records.size() - 1).getId();
        }
    }
}

(三)日志分析定时任务

/**
 * @author Yilena
 *   每天1:30执行一次
 *  分析用户进店日志并提供个性化推送
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class ReportStoreRecommendTask {

    private final RestHighLevelClient restHighLevelClient;
    private final StringRedisTemplate stringRedisTemplate;
    private final Top10MarkStoreMapper top10MarkStoreMapper;
    private final StoreMapper storeMapper;
    private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    // 销毁线程池
    @PreDestroy
    public void destroy() {
        // 禁止新任务提交,等待已提交任务执行完毕
        executor.shutdown();
        try {
            // 最多再等待60秒,如果超时直接强制关闭线程池
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Scheduled(cron = "0 30 1 * * ?")
    public void processStoreRecommend() {
        LocalDateTime start = LocalDateTime.now();
        log.info("开始执行用户进店日志分析,执行时间:{}", start);

        // 查询es索引中近3天的进店日志,获取用户id
        long endDate = LocalDateTime.now()
                .minusDays(1)
                .toInstant(ZoneOffset.UTC)
                .toEpochMilli();
        long startDate = LocalDateTime.ofEpochSecond(endDate / 1000, 0, ZoneOffset.UTC)
                .minusDays(2)
                .toInstant(ZoneOffset.UTC)
                .toEpochMilli();
        String indexPattern = ESIndexConstant.ENTER_STORE_USER_LOG + "_*";
        Set<Long> userIds = queryUserIdsByDateRange(indexPattern, startDate, endDate);

        // 查询es索引中近30天的进店日志
        Map<Long, List<EnterStoreUserLogUsedTask>> userLogs = queryESBy30Days(indexPattern,endDate, userIds);

        // 获取热门榜的作品,只拿前4个
        List<Top10MarkStore> top4MarkStoreList = top10MarkStoreMapper.selectList(new QueryWrapper<Top10MarkStore>()
                .orderByDesc("mark_score")
                .last("LIMIT 4"));

        if (top4MarkStoreList == null || top4MarkStoreList.isEmpty()){
            throw new Top10MarkStoreException(MessageConstant.TOP10_MARK_IS_NULL);
        }
        List<StoreVO> top4MarkStoreVOList = new ArrayList<>();
        top4MarkStoreList.forEach(top10MarkStore -> {
            Long storeId = top10MarkStore.getStoreId();
            Store store = storeMapper.selectById(storeId);
            StoreVO storeVO=new StoreVO();
            BeanUtils.copyProperties(store,storeVO);
            top4MarkStoreVOList.add(storeVO);
        });
        // 打乱顺序
        Collections.shuffle(top4MarkStoreVOList);

        // 备用方案
        String cacheAllKey = STORE_RECOMMEND_ONE + "all";

        // 删除所有旧数据(包括用户的推荐数据)
        String all = STORE_RECOMMEND_ONE + "*";
        Set<String> keys = stringRedisTemplate.keys(all);
        if (keys != null && !keys.isEmpty()) {
            stringRedisTemplate.delete(keys);
        }


        // 缓存数据
        List<String> allStoreJsons = top4MarkStoreVOList.stream()
                .map(store -> JSON.toJSONString(store, SerializerFeature.WriteDateUseDateFormat))
                .toList();
        stringRedisTemplate.opsForList().rightPushAll(cacheAllKey, allStoreJsons);

        // 权重系数配置
        final double STORE_WEIGHT = 0.5;
        final double CATEGORY_WEIGHT = 0.3;
        final double CLASSIFICATION_WEIGHT = 0.2;

        // 将用户数据分片(每500用户一批)
        List<Map.Entry<Long, List<EnterStoreUserLogUsedTask>>> userEntries =
                new ArrayList<>(userLogs.entrySet());
        List<List<Map.Entry<Long, List<EnterStoreUserLogUsedTask>>>> batches =
                ThreadPoolUtil.slicingData(userEntries, 500);

        // 批次并行处理
        batches.forEach(batch -> {
            List<CompletableFuture<Void>> futures = new ArrayList<>();

            batch.forEach(entry -> {
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    Long userId = entry.getKey();
                    List<EnterStoreUserLogUsedTask> logs = entry.getValue();
                    try {
                        analyseLog(userId, logs, STORE_WEIGHT, CATEGORY_WEIGHT, CLASSIFICATION_WEIGHT, top4MarkStoreVOList);
                    }catch (Exception e){
                        // 即使有一个用户进店日志分析失败,不影响其他用户的分析
                        log.error("用户id为{}的进店日志分析失败,失败时间:{}",userId,LocalDateTime.now(),e);
                    }
                }, executor);
                futures.add(future);
            });

            // 等待当前批次完成
            ThreadPoolUtil.allFuturesWait(futures);
        });

        log.info("用户进店日志分析结束,结束时间:{},花费时间:{}s", LocalDateTime.now(), Duration.between(start, LocalDateTime.now()).getSeconds());
    }

    private void analyseLog(Long userId, List<EnterStoreUserLogUsedTask> logs, double STORE_WEIGHT, double CATEGORY_WEIGHT, double CLASSIFICATION_WEIGHT, List<StoreVO> top4MarkStoreVOList) throws Exception {
        // 记录店铺、分区、分类的次数
        Map<Long, Integer> storeCountMap = new HashMap<>();
        Map<Integer, Integer> categoryCountMap = new HashMap<>();
        Map<Integer, Integer> classificationCountMap = new HashMap<>();

        // 记录店铺对应的分类和分区
        Map<Long, Integer> storeCategoryMap = new HashMap<>();
        Map<Long, Integer> storeClassificationMap = new HashMap<>();

        // 统计基础数据
        logs.forEach(log -> {
            Long storeId = log.getStoreId();
            Integer categoryId = log.getCategoryId();
            Integer classificationId = log.getStoreClassificationId();

            // 统计次数
            storeCountMap.merge(storeId, 1, Integer::sum);
            categoryCountMap.merge(categoryId, 1, Integer::sum);
            classificationCountMap.merge(classificationId, 1, Integer::sum);

            // 记录店铺元数据(首次出现时记录)
            storeCategoryMap.putIfAbsent(storeId, categoryId);
            storeClassificationMap.putIfAbsent(storeId, classificationId);
        });

        // 计算店铺得分
        List<StoreScore> scoreList = new ArrayList<>();
        storeCountMap.forEach((storeId, count) -> {
            // 获取店铺对应的分类和分区
            Integer categoryId = storeCategoryMap.get(storeId);
            Integer classificationId = storeClassificationMap.get(storeId);

            //获取店铺实体
            Store store = storeMapper.selectById(storeId);
            StoreVO storeVO = new StoreVO();
            BeanUtils.copyProperties(store, storeVO);

            // 获取对应计数
            int categoryCount = categoryCountMap.getOrDefault(categoryId, 0);
            int classificationCount = classificationCountMap.getOrDefault(classificationId, 0);

            // 计算加权得分
            double score = STORE_WEIGHT * count
                    + CATEGORY_WEIGHT * categoryCount
                    + CLASSIFICATION_WEIGHT * classificationCount;

            scoreList.add(new StoreScore(storeId, storeVO, score));
        });

        // 按得分降序排序
        scoreList.sort((a, b) -> Double.compare(b.getScore(), a.getScore()));

        // 生成推荐列表
        List<StoreVO> recommendedStores = new ArrayList<>(scoreList.stream()
                .limit(4)
                .map(StoreScore::getStoreVO)
                .toList());

        // 去重(当个性化推荐和热门榜补充时可能会出现重复店铺)
        Set<Integer> existStoreIds = recommendedStores.stream()
                .map(StoreVO::getId)
                .collect(Collectors.toSet());

        // 补充热门店铺
        int count = 4 - recommendedStores.size();
        for (int i = 0; i < top4MarkStoreVOList.size() && count > 0; i++) {
            StoreVO candidate = top4MarkStoreVOList.get(i);
            if (!existStoreIds.contains(candidate.getId())) {
                recommendedStores.add(candidate);
                count--;
            }
        }
        String cacheKey = STORE_RECOMMEND_ONE + userId;

        // 缓存数据
        List<String> storeJsons = recommendedStores.stream()
                .map(store -> JSON.toJSONString(store, SerializerFeature.WriteDateUseDateFormat))
                .toList();
        stringRedisTemplate.opsForList().rightPushAll(cacheKey, storeJsons);
    }

    private Set<Long> queryUserIdsByDateRange(String indexPattern, long startDate, long endDate) {
        // 构建查询请求
        SearchRequest request = new SearchRequest(indexPattern);
        request.source().query(QueryBuilders.rangeQuery("createTime")
                .gte(startDate)
                .lte(LocalDateTime.ofEpochSecond(endDate / 1000, 0, ZoneOffset.UTC)
                        .plusDays(1)
                        .toInstant(ZoneOffset.UTC)
                        .toEpochMilli()))
                .size(10000);

        Set<Long> userIds = new HashSet<>();
        try {
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            // 遍历搜索结果,获取用户id
            for (SearchHit hit : response.getHits()) {
                userIds.add(Long.parseLong(hit.getSourceAsMap().get("userId").toString()));
            }
        } catch (IOException e) {
            // 这里抛出异常,中止代码向下执行
            throw new RuntimeException("获取近三天进店用户id失败,失败原因:",e);
        }
        return userIds;
    }

    private Map<Long, List<EnterStoreUserLogUsedTask>> queryESBy30Days(String indexPattern, long end, Set<Long> userIds) {
        Map<Long, List<EnterStoreUserLogUsedTask>> results = new HashMap<>();

        // 构建批量查询条件
        SearchRequest request = new SearchRequest(indexPattern);
        SearchSourceBuilder query = new SearchSourceBuilder();

        query.query(QueryBuilders.boolQuery()
                        .must(QueryBuilders.termsQuery("userId", userIds))
                        .must(QueryBuilders.rangeQuery("createTime")
                                .gte(LocalDateTime.ofEpochSecond(end / 1000, 0, ZoneOffset.UTC)
                                        .minusDays(30)
                                        .toInstant(ZoneOffset.UTC)
                                        .toEpochMilli())
                                .lte(LocalDateTime.ofEpochSecond(end / 1000, 0, ZoneOffset.UTC)
                                        .plusDays(1)
                                        .toInstant(ZoneOffset.UTC)
                                        .toEpochMilli())))
                .size(5000);

        // 配置Scroll滚动查询,设置3分钟超时
        final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(3L));
        request.scroll(scroll);
        request.source(query);

        String scrollId = null;
        try {
            // 初始查询
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            scrollId = response.getScrollId();

            // 循环处理所有分页
            while (response.getHits().getHits().length > 0) {
                // 处理当前页结果
                Arrays.stream(response.getHits().getHits())
                        // 解析对象
                        .map(hit -> {
                            EnterStoreUserLogUsedTask enterStoreUserLogUsedTask = JSON.parseObject(
                                    hit.getSourceAsString(),
                                    EnterStoreUserLogUsedTask.class
                            );
                            // 校验无效数据
                            if (enterStoreUserLogUsedTask.getUserId() == null || enterStoreUserLogUsedTask.getStoreId() == null) {
                                log.error("无效日志记录: {}", hit.getId());
                                // 返回null值稍后进行过滤
                                return null;
                            }
                            return enterStoreUserLogUsedTask;
                        })
                        .filter(Objects::nonNull)
                        // 添加到结果中
                        .forEach(log -> results.computeIfAbsent(log.getUserId(), k -> new ArrayList<>())
                                .add(log)
                        );

                // 获取下一页
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
                        .scroll(scroll);
                response = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
            }
        } catch (IOException e) {
            // 抛出异常,中止代码向下执行
            throw new RuntimeException("ES查询失败", e);
        } finally {
            // 清理Scroll资源
            if (scrollId != null) {
                ClearScrollRequest clearRequest = new ClearScrollRequest();
                clearRequest.addScrollId(scrollId);
                try {
                    restHighLevelClient.clearScroll(clearRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    log.error("Scroll清理失败", e);
                }
            }
        }

        return results;
    }
}

(四)service层

   @Override
    public List<StoreVO> guessLike() {
        // 获取当前用户ID
        Long userId = BaseContext.getCurrentId();

        List<StoreVO> result = new ArrayList<>();

        String cacheAllKey = STORE_RECOMMEND_ONE + "all";

        // 如果未登录,则采用备用方案
        if(userId == null){
            List<String> allStoreStr = stringRedisTemplate.opsForList().range(cacheAllKey, 0, -1);
            allStoreStr.forEach(s -> {
                StoreVO storeVO = JSON.parseObject(s, StoreVO.class);
                result.add(storeVO);
            });
        }else{
            String cacheKey = STORE_RECOMMEND_ONE + userId;
            List<String> userStoreStr = stringRedisTemplate.opsForList().range(cacheKey, 0, -1);
            // 如果缓存为空,则采用备用方案
            if(userStoreStr == null || userStoreStr.isEmpty()){
                List<String> allStoreStr = stringRedisTemplate.opsForList().range(cacheAllKey, 0, -1);
                allStoreStr.forEach(s -> {
                    StoreVO storeVO = JSON.parseObject(s, StoreVO.class);
                    result.add(storeVO);
                });
            }else{
                userStoreStr.forEach(s -> {
                    StoreVO storeVO = JSON.parseObject(s, StoreVO.class);
                    result.add(storeVO);
                });
            }
        }
        return result;
    }

我提供的这个方案肯定不是最佳实践,因为本人的水平并不是很高,如果你有更好的方案,请在评论区告诉我!

~码文不易,留个赞再走吧~

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐