点餐场景下:分析实现十万级用户日志的店铺推荐方案
有一个类似美团的点餐小程序,平台入驻了大量商家店铺。用户访问店铺时,系统会记录进店日志。在首页设置一个个性化推荐模块,为用户精准推荐4家店铺。每月产生约10万条进店日志数据。请设计实现该需求。
目录
一、业务场景介绍
有一个类似美团的点餐小程序,平台入驻了大量商家店铺。用户访问店铺时,系统会记录进店日志。
当前业务需求:
在首页设置一个个性化推荐模块,为用户精准推荐4家店铺。
已知条件:
每月产生约10万条进店日志数据。请设计实现该需求。
二、分析
(一)该分析哪些数据?
实现个性化推荐时,若仅分析短期日志数据,精度难以保证。然而,考虑到每月约10万条日志的累积量,数据库中长期存储的日志数据量级将十分庞大。若对全量历史数据进行实时分析,时间和资源开销将难以承受。
因此,需要确定一个合理的数据分析区间。我们选择最近30天(一个月) 的日志作为分析对象。对于餐饮场景,一个月的用户行为数据通常足以反映近期偏好,且10万量级的数据在可处理范围内。
即使只查询一个月的数据,在存储了数十万乃至数百万条日志的数据库中进行复杂分析,仍会对数据库造成巨大压力并导致响应延迟。为此,需实施冷热数据分离策略:将超过一个月的冷数据归档存储至传统数据库(如MySQL),而将一个月内的热数据存储于高性能搜索引擎(如Elasticsearch)。这样可显著提升查询效率。该方案需要配套一个每日执行的定时任务,用于数据的冷热迁移与同步。
(二)该怎样进行分析?
为提升推荐精度,需进行多维度分析。我们计划考察用户对特定店铺的访问频率、访问店铺的类型偏好以及常访问店铺的地理位置分布等特征。
核心算法:
采用加权评分排序法。具体而言,为上述关键维度(如具体店铺访问频次、类型偏好度、位置偏好度)赋予不同权重,综合计算每个候选店铺对目标用户的推荐得分。最终根据得分对所有店铺进行倒序排序,选取排名前4的店铺作为推荐结果。
(三)该什么时候进行分析?
日志数据量庞大,分析过程耗时较长,因此分析执行的时机至关重要。
- 方案一(实时分析):
用户每次进入首页时触发分析。此方案虽能保证推荐实时性,但会对系统性能造成极大负担(高并发时尤为明显),且单次访问产生的新日志数据量极小,对推荐结果的即时影响微乎其微,性价比较低。 - 方案二(周期性分析 - 如半小时/小时):
定期执行分析任务。相比实时分析减轻了部分压力,但仍存在频繁计算的开销问题,且未能充分利用餐饮场景的流量特征。 - 优化方案(基于场景特征):
考虑到点餐小程序用户访问存在明显的高峰时段(早、中、晚三餐时段),可尝试按餐别对店铺进行分类(如早餐店、午餐店、晚餐店)。在高峰之间的流量低谷时段(例如上午10点、下午3点、晚上9点后)分别执行分析任务,并主要推荐对应时段的店铺类型。此方案一天执行三次,能较好地平衡时效性与性能开销。 - 最终选择(每日分析):
鉴于当前平台店铺类型丰富度有限,按餐别分类执行的效果可能受限。因此,我们决定采用每日执行一次分析任务的方案。该方案在保证推荐结果每日更新的基础上,有效控制了计算频率和资源消耗。此方案同样需要一个定时任务来驱动日志分析过程。
三、核心技术栈
由于我们的分析都是依靠于定时任务完成的,所以这里我们要用到spring task,然后冷热数据分离存储需要用到ElasticSearch和MySQL,最后数据缓存需要用到redis。
四、具体流程图


然后当用户端调用获取推荐店铺的接口时,直接查询redis是否有数据,若有则直接返回,反之则采取备用方案(随机4个热门店铺)返回。
五、具体代码实现
(一)实体类(这里只展示用到的字段)
进店日志类:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EnterStoreUserLog implements Serializable {
@TableId(value = "id")
private Long id;
@Schema(description = "店铺id")
private Long storeId;
@Schema(description = "店铺所属分区id")
private Integer categoryId;
@Schema(description = "店铺类型分类id")
private Integer storeClassificationId;
@Schema(description = "用户id")
private Long userId;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@TableField(value = "create_time",fill = FieldFill.INSERT)
private LocalDateTime createTime;
}
为了节省es的存储空间,这里需要再新建一个实体类,只提取核心字段:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EnterStoreUserLogUsedTask implements Serializable {
@Schema(description = "店铺id")
private Long storeId;
@Schema(description = "店铺所属分区id")
private Integer categoryId;
@Schema(description = "店铺类型分类id")
private Integer storeClassificationId;
@Schema(description = "用户id")
private Long userId;
@Schema(description = "创建时间时间戳")
@TableField(value = "create_time",fill = FieldFill.INSERT)
private Long createTime;
}
为了存储每个店铺对应的得分,我们也需要再新建一个实体类:
/**
* @author Yilena
* 用作进店日志得分排序的封装
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class StoreScore {
@Schema(description = "店铺id")
private Long storeId;
@Schema(description = "店铺VO实体")
private StoreVO storeVO;
@Schema(description = "得分")
private Double score;
}
店铺类以及其VO类则不做展示,因为不怎么重要。
(二)es定时任务
/**
* @author Yilena
* 定时任务,每天1:10执行一次
* 将30天内的进店日志全部移动到es做热数据表
*/
@Slf4j
@RequiredArgsConstructor
@Component
public class ESHotAndColdDataSeparationTask {
private final RestHighLevelClient restHighLevelClient;
private final EnterStoreUserLogMapper enterStoreUserLogMapper;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
@Scheduled(cron = "0 10 1 * * ?")
public void processHotAndColdDataSeparation() {
LocalDateTime start = LocalDateTime.now();
log.info("ES热数据表和冷数据表分离定时任务开始执行,执行时间:{}", start );
// 创建获取索引请求,使用通配符匹配索引名
GetIndexRequest getIndexRequest = new GetIndexRequest();
getIndexRequest.indices(ESIndexConstant.ENTER_STORE_USER_LOG + "_*");
boolean isCatch = false;
String[] indices = new String[0];
try {
// 执行请求并获取匹配的索引名称数组
indices = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
} catch (Exception e) {
log.error("查询ES进店日志索引列表失败,时间: {}", LocalDateTime.now(), e);
// 第一次失败后重试
try {
Thread.sleep(2000);
indices = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT).getIndices();
} catch (InterruptedException | IOException ex) {
log.error("再次查询ES进店日志索引列表失败,时间: {}", LocalDateTime.now(), ex);
isCatch = true;
}
}
// 获取匹配的索引数量
int count = indices.length;
// 如果小于30说明数据出错或者是第一次添加,则需要删除所有数据再插入数据
if(isCatch || count < 30){
deleteAllDataAndInsertData();
}
// 如果大于30则走正常逻辑
else{
// 删除30天前那一天的最旧的全部日志数据
LocalDate oldestDay = LocalDate.now().minusDays(30);// 因为当前时间是1:10,所以要多减去1天
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_" + formatter.format(oldestDay));
deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery());
try {
restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
// 删除失败也没事,这里不影响整体业务,所以不需要终止运行
log.error("删除es进店日志索引库最旧数据失败,失败时间:{}", LocalDateTime.now(),e);
}
// 这个任务执行的时候是1:10执行,所以这里需要新增的是当前时间加24h以内的日志数据
LocalDateTime startOfDay = LocalDate.now().minusDays(1).atStartOfDay();
LocalDateTime endOfDay = startOfDay.plusDays(1);
LambdaQueryWrapper<EnterStoreUserLog> queryWrapper = Wrappers.lambdaQuery(EnterStoreUserLog.class)
.ge(EnterStoreUserLog::getCreateTime, startOfDay)
.lt(EnterStoreUserLog::getCreateTime, endOfDay);
List<EnterStoreUserLog> enterStoreUserLogs = enterStoreUserLogMapper.selectList(queryWrapper);
List<EnterStoreUserLogUsedTask> enterStoreUserLogUsedTasks = enterStoreUserLogs.stream().map(enterStoreUserLog -> {
EnterStoreUserLogUsedTask enterStoreUserLogUsedTask = new EnterStoreUserLogUsedTask();
enterStoreUserLogUsedTask.setStoreId(enterStoreUserLog.getStoreId());
enterStoreUserLogUsedTask.setCategoryId(enterStoreUserLog.getCategoryId());
enterStoreUserLogUsedTask.setStoreClassificationId(enterStoreUserLog.getStoreClassificationId());
enterStoreUserLogUsedTask.setUserId(enterStoreUserLog.getUserId());
enterStoreUserLogUsedTask.setCreateTime(enterStoreUserLog.getCreateTime().toInstant(ZoneOffset.UTC).toEpochMilli());
return enterStoreUserLogUsedTask;
}).toList();
if(!enterStoreUserLogUsedTasks.isEmpty()){
BulkRequest bulkRequest = new BulkRequest();
for (EnterStoreUserLogUsedTask enterStoreUserLogUsedTask : enterStoreUserLogUsedTasks) {
IndexRequest request = new IndexRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_" + formatter.format(LocalDate.now().minusDays(1)))
.id(enterStoreUserLogUsedTask.getUserId() + "_" + formatter.format(LocalDateTime.ofInstant(
Instant.ofEpochMilli(enterStoreUserLogUsedTask.getCreateTime()),
ZoneOffset.UTC
)))
.source(JSONUtil.toJsonStr(enterStoreUserLogUsedTask), XContentType.JSON);
bulkRequest.add(request);
}
try {
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("插入近30天的数据到es失败,失败时间:{}", LocalDateTime.now(),e);
}
}
log.info("ES热数据表和冷数据表分离定时任务执行完毕,结束时间:{},花费时间:{}s", LocalDateTime.now(), Duration.between(start, LocalDateTime.now()).getSeconds());
}
}
private void deleteAllDataAndInsertData() throws RuntimeException {
// 删除索引库所有数据
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_*");
deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery());
try {
restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
// 如果这里还出错的话就直接抛出异常,阻止程序向下执行
throw new RuntimeException("删除es进店日志索引库所有数据失败,失败时间:" + LocalDateTime.now());
}
// 插入近30天的数据
// 先从db中查数据
// 原本使用分页查询,因为sharding-JDBC不支持,所以这里采用游标查询
LocalDateTime startTime = LocalDateTime.now().minusDays(30);
int pageSize = 1000;
Long lastId = null;
LambdaQueryWrapper<EnterStoreUserLog> queryWrapper = Wrappers.lambdaQuery(EnterStoreUserLog.class)
.ge(EnterStoreUserLog::getCreateTime,startTime)
.orderByAsc(EnterStoreUserLog::getId);
while (true) {
// 每次循环创建新Wrapper, 防止多次叠加查询条件
LambdaQueryWrapper<EnterStoreUserLog> currentWrapper = queryWrapper.clone();
// 添加分页条件
if (lastId != null) {
currentWrapper.gt(EnterStoreUserLog::getId, lastId);
}
currentWrapper.last("LIMIT " + pageSize);
// 查询数据
List<EnterStoreUserLog> records = enterStoreUserLogMapper.selectList(queryWrapper);
if (records.isEmpty()) break;
List<EnterStoreUserLogUsedTask> convertedList = records.stream()
.map(log -> {
EnterStoreUserLogUsedTask task = new EnterStoreUserLogUsedTask();
task.setStoreId(log.getStoreId());
task.setCategoryId(log.getCategoryId());
task.setStoreClassificationId(log.getStoreClassificationId());
task.setUserId(log.getUserId());
task.setCreateTime(log.getCreateTime().toInstant(ZoneOffset.UTC).toEpochMilli());
return task;
})
.toList();
BulkRequest bulkRequest = new BulkRequest();
// 批量插入es
for (EnterStoreUserLogUsedTask enterStoreUserLogUsedTask : convertedList) {
LocalDate date = LocalDateTime.ofInstant(
Instant.ofEpochMilli(enterStoreUserLogUsedTask.getCreateTime()),
ZoneOffset.UTC
).toLocalDate();
IndexRequest request = new IndexRequest(ESIndexConstant.ENTER_STORE_USER_LOG + "_" + formatter.format(date))
.id(enterStoreUserLogUsedTask.getUserId() + "_" + formatter.format(LocalDateTime.ofInstant(
Instant.ofEpochMilli(enterStoreUserLogUsedTask.getCreateTime()),
ZoneOffset.UTC
)))
.source(JSONUtil.toJsonStr(enterStoreUserLogUsedTask), XContentType.JSON);
bulkRequest.add(request);
}
try {
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
// 如果这里还出错的话就直接抛出异常,阻止程序向下执行
throw new RuntimeException("插入近30天的数据到es失败,失败时间:" + LocalDateTime.now());
}
lastId = records.get(records.size() - 1).getId();
}
}
}
(三)日志分析定时任务
/**
* @author Yilena
* 每天1:30执行一次
* 分析用户进店日志并提供个性化推送
*/
@RequiredArgsConstructor
@Slf4j
@Component
public class ReportStoreRecommendTask {
private final RestHighLevelClient restHighLevelClient;
private final StringRedisTemplate stringRedisTemplate;
private final Top10MarkStoreMapper top10MarkStoreMapper;
private final StoreMapper storeMapper;
private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
// 销毁线程池
@PreDestroy
public void destroy() {
// 禁止新任务提交,等待已提交任务执行完毕
executor.shutdown();
try {
// 最多再等待60秒,如果超时直接强制关闭线程池
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
@Scheduled(cron = "0 30 1 * * ?")
public void processStoreRecommend() {
LocalDateTime start = LocalDateTime.now();
log.info("开始执行用户进店日志分析,执行时间:{}", start);
// 查询es索引中近3天的进店日志,获取用户id
long endDate = LocalDateTime.now()
.minusDays(1)
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
long startDate = LocalDateTime.ofEpochSecond(endDate / 1000, 0, ZoneOffset.UTC)
.minusDays(2)
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
String indexPattern = ESIndexConstant.ENTER_STORE_USER_LOG + "_*";
Set<Long> userIds = queryUserIdsByDateRange(indexPattern, startDate, endDate);
// 查询es索引中近30天的进店日志
Map<Long, List<EnterStoreUserLogUsedTask>> userLogs = queryESBy30Days(indexPattern,endDate, userIds);
// 获取热门榜的作品,只拿前4个
List<Top10MarkStore> top4MarkStoreList = top10MarkStoreMapper.selectList(new QueryWrapper<Top10MarkStore>()
.orderByDesc("mark_score")
.last("LIMIT 4"));
if (top4MarkStoreList == null || top4MarkStoreList.isEmpty()){
throw new Top10MarkStoreException(MessageConstant.TOP10_MARK_IS_NULL);
}
List<StoreVO> top4MarkStoreVOList = new ArrayList<>();
top4MarkStoreList.forEach(top10MarkStore -> {
Long storeId = top10MarkStore.getStoreId();
Store store = storeMapper.selectById(storeId);
StoreVO storeVO=new StoreVO();
BeanUtils.copyProperties(store,storeVO);
top4MarkStoreVOList.add(storeVO);
});
// 打乱顺序
Collections.shuffle(top4MarkStoreVOList);
// 备用方案
String cacheAllKey = STORE_RECOMMEND_ONE + "all";
// 删除所有旧数据(包括用户的推荐数据)
String all = STORE_RECOMMEND_ONE + "*";
Set<String> keys = stringRedisTemplate.keys(all);
if (keys != null && !keys.isEmpty()) {
stringRedisTemplate.delete(keys);
}
// 缓存数据
List<String> allStoreJsons = top4MarkStoreVOList.stream()
.map(store -> JSON.toJSONString(store, SerializerFeature.WriteDateUseDateFormat))
.toList();
stringRedisTemplate.opsForList().rightPushAll(cacheAllKey, allStoreJsons);
// 权重系数配置
final double STORE_WEIGHT = 0.5;
final double CATEGORY_WEIGHT = 0.3;
final double CLASSIFICATION_WEIGHT = 0.2;
// 将用户数据分片(每500用户一批)
List<Map.Entry<Long, List<EnterStoreUserLogUsedTask>>> userEntries =
new ArrayList<>(userLogs.entrySet());
List<List<Map.Entry<Long, List<EnterStoreUserLogUsedTask>>>> batches =
ThreadPoolUtil.slicingData(userEntries, 500);
// 批次并行处理
batches.forEach(batch -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
batch.forEach(entry -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
Long userId = entry.getKey();
List<EnterStoreUserLogUsedTask> logs = entry.getValue();
try {
analyseLog(userId, logs, STORE_WEIGHT, CATEGORY_WEIGHT, CLASSIFICATION_WEIGHT, top4MarkStoreVOList);
}catch (Exception e){
// 即使有一个用户进店日志分析失败,不影响其他用户的分析
log.error("用户id为{}的进店日志分析失败,失败时间:{}",userId,LocalDateTime.now(),e);
}
}, executor);
futures.add(future);
});
// 等待当前批次完成
ThreadPoolUtil.allFuturesWait(futures);
});
log.info("用户进店日志分析结束,结束时间:{},花费时间:{}s", LocalDateTime.now(), Duration.between(start, LocalDateTime.now()).getSeconds());
}
private void analyseLog(Long userId, List<EnterStoreUserLogUsedTask> logs, double STORE_WEIGHT, double CATEGORY_WEIGHT, double CLASSIFICATION_WEIGHT, List<StoreVO> top4MarkStoreVOList) throws Exception {
// 记录店铺、分区、分类的次数
Map<Long, Integer> storeCountMap = new HashMap<>();
Map<Integer, Integer> categoryCountMap = new HashMap<>();
Map<Integer, Integer> classificationCountMap = new HashMap<>();
// 记录店铺对应的分类和分区
Map<Long, Integer> storeCategoryMap = new HashMap<>();
Map<Long, Integer> storeClassificationMap = new HashMap<>();
// 统计基础数据
logs.forEach(log -> {
Long storeId = log.getStoreId();
Integer categoryId = log.getCategoryId();
Integer classificationId = log.getStoreClassificationId();
// 统计次数
storeCountMap.merge(storeId, 1, Integer::sum);
categoryCountMap.merge(categoryId, 1, Integer::sum);
classificationCountMap.merge(classificationId, 1, Integer::sum);
// 记录店铺元数据(首次出现时记录)
storeCategoryMap.putIfAbsent(storeId, categoryId);
storeClassificationMap.putIfAbsent(storeId, classificationId);
});
// 计算店铺得分
List<StoreScore> scoreList = new ArrayList<>();
storeCountMap.forEach((storeId, count) -> {
// 获取店铺对应的分类和分区
Integer categoryId = storeCategoryMap.get(storeId);
Integer classificationId = storeClassificationMap.get(storeId);
//获取店铺实体
Store store = storeMapper.selectById(storeId);
StoreVO storeVO = new StoreVO();
BeanUtils.copyProperties(store, storeVO);
// 获取对应计数
int categoryCount = categoryCountMap.getOrDefault(categoryId, 0);
int classificationCount = classificationCountMap.getOrDefault(classificationId, 0);
// 计算加权得分
double score = STORE_WEIGHT * count
+ CATEGORY_WEIGHT * categoryCount
+ CLASSIFICATION_WEIGHT * classificationCount;
scoreList.add(new StoreScore(storeId, storeVO, score));
});
// 按得分降序排序
scoreList.sort((a, b) -> Double.compare(b.getScore(), a.getScore()));
// 生成推荐列表
List<StoreVO> recommendedStores = new ArrayList<>(scoreList.stream()
.limit(4)
.map(StoreScore::getStoreVO)
.toList());
// 去重(当个性化推荐和热门榜补充时可能会出现重复店铺)
Set<Integer> existStoreIds = recommendedStores.stream()
.map(StoreVO::getId)
.collect(Collectors.toSet());
// 补充热门店铺
int count = 4 - recommendedStores.size();
for (int i = 0; i < top4MarkStoreVOList.size() && count > 0; i++) {
StoreVO candidate = top4MarkStoreVOList.get(i);
if (!existStoreIds.contains(candidate.getId())) {
recommendedStores.add(candidate);
count--;
}
}
String cacheKey = STORE_RECOMMEND_ONE + userId;
// 缓存数据
List<String> storeJsons = recommendedStores.stream()
.map(store -> JSON.toJSONString(store, SerializerFeature.WriteDateUseDateFormat))
.toList();
stringRedisTemplate.opsForList().rightPushAll(cacheKey, storeJsons);
}
private Set<Long> queryUserIdsByDateRange(String indexPattern, long startDate, long endDate) {
// 构建查询请求
SearchRequest request = new SearchRequest(indexPattern);
request.source().query(QueryBuilders.rangeQuery("createTime")
.gte(startDate)
.lte(LocalDateTime.ofEpochSecond(endDate / 1000, 0, ZoneOffset.UTC)
.plusDays(1)
.toInstant(ZoneOffset.UTC)
.toEpochMilli()))
.size(10000);
Set<Long> userIds = new HashSet<>();
try {
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 遍历搜索结果,获取用户id
for (SearchHit hit : response.getHits()) {
userIds.add(Long.parseLong(hit.getSourceAsMap().get("userId").toString()));
}
} catch (IOException e) {
// 这里抛出异常,中止代码向下执行
throw new RuntimeException("获取近三天进店用户id失败,失败原因:",e);
}
return userIds;
}
private Map<Long, List<EnterStoreUserLogUsedTask>> queryESBy30Days(String indexPattern, long end, Set<Long> userIds) {
Map<Long, List<EnterStoreUserLogUsedTask>> results = new HashMap<>();
// 构建批量查询条件
SearchRequest request = new SearchRequest(indexPattern);
SearchSourceBuilder query = new SearchSourceBuilder();
query.query(QueryBuilders.boolQuery()
.must(QueryBuilders.termsQuery("userId", userIds))
.must(QueryBuilders.rangeQuery("createTime")
.gte(LocalDateTime.ofEpochSecond(end / 1000, 0, ZoneOffset.UTC)
.minusDays(30)
.toInstant(ZoneOffset.UTC)
.toEpochMilli())
.lte(LocalDateTime.ofEpochSecond(end / 1000, 0, ZoneOffset.UTC)
.plusDays(1)
.toInstant(ZoneOffset.UTC)
.toEpochMilli())))
.size(5000);
// 配置Scroll滚动查询,设置3分钟超时
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(3L));
request.scroll(scroll);
request.source(query);
String scrollId = null;
try {
// 初始查询
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
scrollId = response.getScrollId();
// 循环处理所有分页
while (response.getHits().getHits().length > 0) {
// 处理当前页结果
Arrays.stream(response.getHits().getHits())
// 解析对象
.map(hit -> {
EnterStoreUserLogUsedTask enterStoreUserLogUsedTask = JSON.parseObject(
hit.getSourceAsString(),
EnterStoreUserLogUsedTask.class
);
// 校验无效数据
if (enterStoreUserLogUsedTask.getUserId() == null || enterStoreUserLogUsedTask.getStoreId() == null) {
log.error("无效日志记录: {}", hit.getId());
// 返回null值稍后进行过滤
return null;
}
return enterStoreUserLogUsedTask;
})
.filter(Objects::nonNull)
// 添加到结果中
.forEach(log -> results.computeIfAbsent(log.getUserId(), k -> new ArrayList<>())
.add(log)
);
// 获取下一页
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
.scroll(scroll);
response = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
}
} catch (IOException e) {
// 抛出异常,中止代码向下执行
throw new RuntimeException("ES查询失败", e);
} finally {
// 清理Scroll资源
if (scrollId != null) {
ClearScrollRequest clearRequest = new ClearScrollRequest();
clearRequest.addScrollId(scrollId);
try {
restHighLevelClient.clearScroll(clearRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("Scroll清理失败", e);
}
}
}
return results;
}
}
(四)service层
@Override
public List<StoreVO> guessLike() {
// 获取当前用户ID
Long userId = BaseContext.getCurrentId();
List<StoreVO> result = new ArrayList<>();
String cacheAllKey = STORE_RECOMMEND_ONE + "all";
// 如果未登录,则采用备用方案
if(userId == null){
List<String> allStoreStr = stringRedisTemplate.opsForList().range(cacheAllKey, 0, -1);
allStoreStr.forEach(s -> {
StoreVO storeVO = JSON.parseObject(s, StoreVO.class);
result.add(storeVO);
});
}else{
String cacheKey = STORE_RECOMMEND_ONE + userId;
List<String> userStoreStr = stringRedisTemplate.opsForList().range(cacheKey, 0, -1);
// 如果缓存为空,则采用备用方案
if(userStoreStr == null || userStoreStr.isEmpty()){
List<String> allStoreStr = stringRedisTemplate.opsForList().range(cacheAllKey, 0, -1);
allStoreStr.forEach(s -> {
StoreVO storeVO = JSON.parseObject(s, StoreVO.class);
result.add(storeVO);
});
}else{
userStoreStr.forEach(s -> {
StoreVO storeVO = JSON.parseObject(s, StoreVO.class);
result.add(storeVO);
});
}
}
return result;
}
我提供的这个方案肯定不是最佳实践,因为本人的水平并不是很高,如果你有更好的方案,请在评论区告诉我!

~码文不易,留个赞再走吧~
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)