引言
众所周知,构建高用户量的ToC系统时,需要关注:高并发、高可用、低延迟、海量用户、用户体验、快速迭代等。
让我们来一起看看高并发、低延迟思想下的经典代码范例。
本篇涵盖并行调用、高性能计数、批处理、缓存等核心模式。
1 CompletableFuture并行调用
在ToC应用中,一个页面(如“我的主页”)的数据往往来自多个下游服务(用户信息、推荐商品、我的待办)。串行调用会使延迟叠加,是性能杀手。
场景:一个请求需要聚合用户信息、用户积分、用户优惠券三个服务的数据。每个服务耗时约100ms。
@Service
public class AggregationService {
@Autowired
private UserInfoService userInfoService;
@Autowired
private PointService pointService;
@Autowired
private CouponService couponService;
public UserHomePageDTO getHomePageData(Long userId) {
// 1. 串行调用,耗时叠加
UserInfo userInfo = userInfoService.getUserInfo(userId); // 耗时100ms
UserPoint point = pointService.getUserPoint(userId); // 耗时100ms
List coupons = couponService.getCoupons(userId); // 耗时100ms
// 2. 组装数据
return assembleDTO(userInfo, point, coupons);
}
// ... assembleDTO a
}
使用CompletableFuture进行并发操作:
@Service
public class AggregationService {
@Autowired
private UserInfoService userInfoService;
@Autowired
private PointService pointService;
@Autowired
private CouponService couponService;
// 自定义线程池,隔离不同业务,避免使用默认的ForkJoinPool处理I/O密集型任务
@Bean(destroyMethod = "shutdownNow")
public ExecutorService bizExecutor() {
return new ThreadPoolExecutor(
20, 50, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("biz-aggr-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:主线程执行,避免丢任务
);
}
public UserHomePageDTO getHomePageData(Long userId) {
ExecutorService executor = bizExecutor(); // 实际应 @Autowired,不然每次调用都创建新线程池
CompletableFuture userInfoFuture = CompletableFuture
.supplyAsync(() -> userInfoService.getUserInfo(userId), executor)
.orTimeout(150, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log.warn("UserInfoService timeout or error for userId: {}", userId, ex);
return UserInfo.EMPTY; // 定义 EMPTY 常量,避免 null
});
CompletableFuture pointFuture = CompletableFuture
.supplyAsync(() -> pointService.getUserPoint(userId), executor)
.orTimeout(150, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log.warn("PointService error for userId: {}", userId, ex);
return UserPoint.ZERO;
});
CompletableFuture> couponsFuture = CompletableFuture
.supplyAsync(() -> couponService.getCoupons(userId), executor)
.orTimeout(150, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log.warn("CouponService error for userId: {}", userId, ex);
return Collections.emptyList();
});
// allOf 不返回结果,只等待完成
CompletableFuture.allOf(userInfoFuture, pointFuture, couponsFuture).join();
// 此时所有 future 已完成(成功或降级),直接 join() 安全
return assembleDTO(
userInfoFuture.join(),
pointFuture.join(),
couponsFuture.join()
);
}
}
将多个独立的、耗时的I/O操作并行化,用总耗时最长的单个操作时间来近似替代总耗时。
当然,对于对于I/O密集型任务,必须使用独立的、线程数合理的线程池。耗尽默认的ForkJoinPool会导致整个JVM的其他异步任务(如parallelStream)瘫痪。
1.1 注意事项
注意,生产中一定要设置超时+异常处理(降级或补偿)。
// 设置超时 + 异常降级
CompletableFuture userInfoFuture = CompletableFuture
.supplyAsync(() -> userInfoService.getUserInfo(userId), BIZ_EXECUTOR)
.orTimeout(200, TimeUnit.MILLISECONDS) // 超时熔断
.exceptionally(ex -> {
log.warn("UserInfoService failed for userId: {}", userId, ex);
return DEFAULT_USER_INFO; // 降级兜底
});
2 本地高并发计数器:LongAdder
统计一个热门商品详情页的实时浏览次数时,并发量极高。使用AtomicLong和synchronized性能较差:
public class ViewCounter {
// 方案A: synchronized,锁粒度太粗,所有线程串行执行,并发越高,性能越差
private long count = 0;
public synchronized void increment() {
count++;
}
// 方案B: AtomicLong,在高并发下,大量线程CAS自旋失败,消耗CPU
private AtomicLong atomicCount = new AtomicLong(0);
public void incrementWithAtomic() {
atomicCount.incrementAndGet();
}
}
可以使用LongAdder替代AtomicLong和synchronized:
import java.util.concurrent.atomic.LongAdder;
public class ViewCounter {
// LongAdder是分段锁思想的实现,内部维护一个Cell数组
// 并发不高时,直接操作base值,和AtomicLong类似
// 并发变高时,线程会hash到不同的Cell上进行累加,大大减少了冲突
private final LongAdder longAdder = new LongAdder();
public void increment() {
longAdder.increment();
}
public long getCount() {
// sum()方法会累加所有Cell的值和base值,在读多写少的场景下可能会有数据不一致
// 但在计数器这种场景下,最终一致性是可以接受的
return longAdder.sum();
}
}
LongAdder采用了空间换时间,分散热点的思想。 通过将一个热点计数器分散到多个“槽”(Cell)中,让不同的线程去更新不同的槽,最后再汇总,从而极大地降低了锁竞争。
适合读操作(sum())的频率远低于写操作(increment())的场景,用来统计、计数。
2.1 注意事项
LongAdder 内部的 Cell[] 在高并发下会扩容,极端情况下可能占用大量堆内存(比如 10w 线程持续写入)。
因此,LongAdder适合写远多于读的场景,且在堆内存有限的场景,需要监控LongAdder的实例数量。
或者考虑 Striped64 的变种或 Disruptor 环形缓冲。
此外,如果需要分布式的方案,Redis是主流方案,在实际中可以采用LongAdder+Redis的混合方案:
// 1. 本地 LongAdder 缓冲计数(高性能)
private final LongAdder localCounter = new LongAdder();
// 2. 定时/批量 flush 到 Redis(减少网络调用)
@Scheduled(fixedDelay = 1000)
public void flushToRedis() {
long delta = localCounter.sumThenReset();
if (delta > 0) {
redisTemplate.opsForValue().increment("global:counter", delta);
}
}
// sumThenReset() 不是原子的,在多线程调用 flush 时可能重复计数。加锁或使用单线程调度,比如上面的@Scheduled
- 本地计数无锁高性能
- Redis 写入频率大幅降低(比如 1000 次本地计数 → 1 次 Redis INCR)
- 适合 PV/日志类计数(允许短暂不一致)
不过还需要注意,混合方案存在数据丢失风险(未 flush 的 delta 丢失)。如果需要强一致性(如金融计数),应直接使用Redis INCR,或引入本地WAL(Write-Ahead-Log)保证flush前数据可恢复。
3 优先考虑请求批处理
比如现在有一批商品ID。我们需要根据一批商品ID,查询每个商品的库存信息。不好的示例如下:
@Service
public class StockService {
@Autowired
private StockRpcService stockRpcService; // 假设这是一个RPC服务
public Map getStocks(List productIds) {
Map resultMap = new HashMap<>();
if (CollectionUtils.isEmpty(productIds)) {
return resultMap;
}
for (Long productId : productIds) {
// 问题点:在循环中调用RPC,有多少个ID,就发起多少次调用
Integer stock = stockRpcService.getStockByProductId(productId);
resultMap.put(productId, stock);
}
return resultMap;
}
}
这样的处理性能问题严重,在循环中进行网络调用(RPC或DB查询)是严重的反模式。如果productIds列表有100个元素,就会发起100次独立的网络请求。网络开销和下游服务的压力会非常大,导致接口延迟随输入规模线性增长。 且容错性差,任何一次RPC调用失败,都可能导致整个方法抛出异常而中断,已经查询到的数据也丢失了。
需要采取批处理进行优化:
@Service
public class StockService {
@Autowired
private StockRpcService stockRpcService;
public Map getStocks(List productIds) {
if (CollectionUtils.isEmpty(productIds)) {
return Collections.emptyMap();
}
// 核心改动:调用下游服务提供的批量接口
// 假设stockRpcService提供了一个批量查询接口
Map stockMap = stockRpcService.getStocksByProductIds(productIds);
// 如果下游不支持批量接口,也应该在业务层做聚合,然后分组分批调用单次查询接口
// List> partitions = Lists.partition(productIds, 200); // e.g., 每200个ID一批
// for (List partition : partitions) { ... }
return stockMap;
}
}
无论是数据库查询还是RPC调用,都应该将多次单点操作聚合成一次批量操作。
批处理可以大幅减少网络往返次数(RTT),降低数据库或下游服务的连接开销和负载,接口性能通常有数量级的提升。
3.1 注意事项
批量太大可能压垮下游(如 DB 一次查 10w 条),批量太小又失去意义。一般来说根据实际资源情况进行调整。
此外,还需要注意分片失败的处理策略,是否重试?是否跳过?
List> partitions = Lists.partition(productIds, 100); // 经验值:100~500
Map result = new HashMap<>();
for (List batch : partitions) {
try {
Map batchResult = stockRpcService.getStocksByProductIds(batch);
result.putAll(batchResult);
} catch (Exception e) {
// 可选:记录失败 batch,异步重试 or 降级返回默认库存
log.error("Batch stock query failed for: {}", batch, e);
batch.forEach(id -> result.put(id, DEFAULT_STOCK));
}
}
4 缓存:高并发的基石
在商品查询、用户信息等高频读场景中,缓存是降低数据库和下游服务压力的核心手段。往往采用 本地缓存(Caffeine) + 分布式缓存(Redis) 的多级架构。
@Service
public class UserInfoService {
// Caffeine 本地缓存 + Redis 二级缓存
private final LoadingCache localCache = Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(this::loadFromRedis);
private UserInfo loadFromRedis(Long userId) {
// 1. 先查 Redis
UserInfo user = redisTemplate.opsForValue().get("user:info:" + userId);
if (user != null) {
return user;
}
// 2. Redis 未命中,查 DB(防穿透)
user = userMapper.selectById(userId);
if (user != null) {
// 写回 Redis,设置合理过期时间
redisTemplate.opsForValue().set("user:info:" + userId, user, 30, TimeUnit.MINUTES);
} else {
// 3. 防缓存穿透:空值也缓存(短 TTL)
redisTemplate.opsForValue().set("user:info:" + userId, UserInfo.EMPTY, 2, TimeUnit.MINUTES);
}
return user;
}
public UserInfo getUserInfo(Long userId) {
return localCache.get(userId);
}
}
4.1 缓存问题处理
使用缓存需要注意防备缓存穿透、缓存雪崩问题,处理好缓存一致性等...
4.1.1 缓存穿透
查询不存在的数据 → 缓存空值(短 TTL)或使用布隆过滤器过滤非法ID
@Service
public class UserService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private UserMapper userMapper;
// 布隆过滤器(启动时加载已存在用户 ID)
private volatile BloomFilter userBloomFilter = BloomFilter.create(Funnels.longFunnel(), 10_000_000, 0.01); // 1% 误判率
@PostConstruct
public void initBloomFilter() {
// 实际可从 DB 或离线任务加载活跃用户 ID
// userBloomFilter.putAll(existsUserIds);
}
public User getUserById(Long userId) {
// 1. 布隆过滤器拦截明显非法 ID
if (userId == null || userId <= 0 || !userBloomFilter.mightContain(userId)) {
return User.EMPTY; // 直接返回空对象,不查缓存/DB
}
String key = "user:info:" + userId;
// 2. 查 Redis
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
if (user == User.EMPTY) {
return User.EMPTY; // 空值缓存命中
}
return user;
}
// 3. Redis 未命中,查 DB
user = userMapper.selectById(userId);
if (user != null) {
// 正常数据:缓存 30 分钟
redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
} else {
// 空值:缓存 2 分钟,防止穿透
redisTemplate.opsForValue().set(key, User.EMPTY, 2, TimeUnit.MINUTES);
}
return user;
}
}
注意:布隆过滤器需定期更新(如监听用户注册事件),否则新用户会被误判为“不存在”。
4.1.2 缓存击穿
如“双 11 热门商品”缓存过期,10w 请求同时打到 DB,导致 DB 瞬间过载。
解决方案:
- 本地缓存(Caffeine)天然防击穿:
get(key, callable)内部加锁,只允许一个线程加载 - Redis 互斥锁:用
SET key lock EX 5 NX(SETNX)实现分布式锁,仅一个线程回源
@Service
public class ProductService {
private final LoadingCache localCache = Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(this::loadProductFromRemote); // 自动加锁加载
private Product loadProductFromRemote(Long productId) {
// 1. 查 Redis(Redis 层也可能击穿,但压力远小于 DB)
String redisKey = "product:detail:" + productId;
Product product = (Product) redisTemplate.opsForValue().get(redisKey);
if (product != null) {
return product;
}
// 2. Redis 未命中,尝试加分布式锁回源
String lockKey = "lock:product:" + productId;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 3, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
// 双重检查(防止锁释放后又被其他线程加载)
product = (Product) redisTemplate.opsForValue().get(redisKey);
if (product == null) {
product = productMapper.selectById(productId);
if (product != null) {
redisTemplate.opsForValue().set(redisKey, product, 30, TimeUnit.MINUTES);
} else {
redisTemplate.opsForValue().set(redisKey, Product.EMPTY, 2, TimeUnit.MINUTES);
}
}
} finally {
redisTemplate.delete(lockKey); // 释放锁
}
} else {
// 未抢到锁,短暂等待后重试(或直接返回旧数据/降级)
try { Thread.sleep(50); } catch (InterruptedException e) { /* ignore */ }
Product cached = (Product) redisTemplate.opsForValue().get(redisKey);
return cached != null ? cached : Product.EMPTY;
}
return product;
}
public Product getProduct(Long productId) {
return localCache.get(productId); // Caffeine 自动处理并发加载
}
}
Caffeine 本地缓存已解决 90% 的击穿问题,Redis 层锁仅作为兜底。
4.1.3 缓存雪崩
问题:系统重启或缓存预热失败,导致所有 Key TTL 相同,同时失效。
解决方案:
- 随机过期时间:基础 TTL + 随机偏移(如 30 分钟 ± 5 分钟)
- 永不过期 + 后台刷新:适用于核心热点数据(如首页 Banner)
// 设置 30 分钟基础过期 + 0~300 秒随机偏移
long baseTtl = 30 * 60; // 30 分钟
long randomTtl = ThreadLocalRandom.current().nextLong(0, 300); // 0~5 分钟
redisTemplate.opsForValue().set("product:detail:" + productId, product, baseTtl + randomTtl, TimeUnit.SECONDS);
对核心数据使用 Caffeine 的 refreshAfterWrite,后台异步刷新,前台永远读缓存。
4.1.4 缓存一致性
问题:更新用户昵称后,缓存仍是旧值,导致用户看到不一致数据。
解决方案:
采用 “先更新 DB,再删除缓存”(Cache-Aside 模式),并考虑延迟双删应对主从延迟:
- 先更新数据库
- 删除缓存
- 延迟 N 毫秒后,再次删除缓存(应对主从复制延迟)
@Service
@Transactional
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private TaskExecutor taskExecutor; // 异步线程池
public void updateUser(User user) {
// 1. 更新 DB
userMapper.updateById(user);
String cacheKey = "user:info:" + user.getId();
// 2. 第一次删缓存
redisTemplate.delete(cacheKey);
// 3. 延迟 500ms 再删一次(覆盖主从延迟窗口)
taskExecutor.execute(() -> {
try {
Thread.sleep(500);
redisTemplate.delete(cacheKey);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
不过还需要注意,这种延迟双删的模式是仅适用于中小规模系统、允许短暂不一致的轻量方案。
对于大型项目,需要更可靠的缓存一致性可以采用 Cannal+Binlog监听的中间件级可靠方案。简单流程如下:
- Canal 伪装成 MySQL 从库,实时监听 Binlog
- 解析
UPDATE/DELETE事件,精准提取变更的主键 - 异步发送 MQ 或直接调用,删除对应缓存
// Canal 监听到 Binlog 事件
{
"table": "user",
"type": "UPDATE",
"data": { "id": 123, "name": "new_name" }
}
// 缓存服务消费事件
redis.delete("user:info:123");
上了 Canal,架构复杂性会提升一些,但不需要在业务代码里写延迟双删了。
| 阶段 | 方案 | 特点 |
|---|---|---|
| 初期 | 延迟双删 | 代码简单,快速上线 |
| 中期 | 延迟双删 + 本地缓存降级 | 减少对 Redis 一致性的依赖 |
| 成熟期 | Canal / DTS / MQ + Binlog | 专业中间件保障一致性 |
| 高级 | 缓存预热 + 版本号/逻辑时钟 | 如 user_v2,彻底规避不一致 |
5 监控与可观测性
高并发系统不能只看代码,还要能观测、诊断、预警。
虽然本篇不展开监控细节,但所有高并发接口都应具备:
- 关键路径打点(如各服务调用耗时)
- 异常率、超时率监控
- 线程池活跃度、队列堆积告