Java 集合框架高級(jí)應(yīng)用與架構(gòu)設(shè)計(jì)方案
一、章節(jié)學(xué)習(xí)目標(biāo)與重點(diǎn)
1.1 學(xué)習(xí)目標(biāo)
- 掌握集合框架在復(fù)雜架構(gòu)場(chǎng)景中的高級(jí)應(yīng)用(緩存設(shè)計(jì)、分層存儲(chǔ)、數(shù)據(jù)分片)
- 理解集合與設(shè)計(jì)模式的結(jié)合實(shí)踐(享元模式、裝飾器模式、迭代器模式等)
- 精通高并發(fā)、大數(shù)據(jù)量場(chǎng)景下集合的架構(gòu)優(yōu)化方案
- 解決集合在分布式、微服務(wù)架構(gòu)中的適配問(wèn)題
- 能夠基于集合框架設(shè)計(jì)可擴(kuò)展、高性能的核心業(yè)務(wù)組件
1.2 學(xué)習(xí)重點(diǎn)
- 基于集合的緩存架構(gòu)設(shè)計(jì)(本地緩存、多級(jí)緩存)
- 集合與設(shè)計(jì)模式的深度融合實(shí)踐
- 大數(shù)據(jù)量下集合的分片存儲(chǔ)與并行處理架構(gòu)
- 分布式場(chǎng)景中集合的數(shù)據(jù)一致性與傳輸優(yōu)化
- 集合框架驅(qū)動(dòng)的業(yè)務(wù)組件設(shè)計(jì)(配置中心、規(guī)則引擎)
二、基于集合的緩存架構(gòu)設(shè)計(jì)實(shí)戰(zhàn)
?? 緩存是提升系統(tǒng)性能的核心手段,而 Java 集合框架是實(shí)現(xiàn)本地緩存的基礎(chǔ)。基于集合的緩存設(shè)計(jì)需兼顧查詢(xún)效率、內(nèi)存占用、過(guò)期策略、線程安全四大核心訴求,常用集合包括 HashMap、LinkedHashMap、WeakHashMap 等。
2.1 本地緩存設(shè)計(jì)核心要素
- 存儲(chǔ)結(jié)構(gòu):選擇合適的集合實(shí)現(xiàn)類(lèi)(如 HashMap 用于高效查詢(xún),LinkedHashMap 用于 LRU 過(guò)期策略)
- 過(guò)期策略:支持時(shí)間過(guò)期(TTL)、空間淘汰(LRU/LFU)
- 線程安全:高并發(fā)場(chǎng)景需保證讀寫(xiě)安全
- 內(nèi)存控制:避免緩存膨脹,支持自動(dòng)回收無(wú)用數(shù)據(jù)
2.2 基于 LinkedHashMap 的 LRU 緩存實(shí)現(xiàn)(固定容量+TTL)
LinkedHashMap 天然支持訪問(wèn)順序維護(hù),結(jié)合 removeEldestEntry() 方法可實(shí)現(xiàn) LRU 淘汰,擴(kuò)展后支持 TTL 過(guò)期策略:
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
/**
* 基于 LinkedHashMap 的 LRU+TTL 本地緩存
* 特性:固定容量(LRU 淘汰)、時(shí)間過(guò)期(TTL)、線程安全
*/
public class LruTtlCache<K, V> extends LinkedHashMap<K, CacheEntry<V>> {
private final int maxCapacity; // 最大容量
private final long ttlMillis; // 過(guò)期時(shí)間(毫秒)
private final ReentrantLock lock = new ReentrantLock(); // 可重入鎖保證線程安全
// 緩存條目:包裝值和過(guò)期時(shí)間
private static class CacheEntry<V> {
V value;
long expireTime; // 過(guò)期時(shí)間戳(毫秒)
CacheEntry(V value, long expireTime) {
this.value = value;
this.expireTime = expireTime;
}
// 判斷是否過(guò)期
boolean isExpired() {
return System.currentTimeMillis() > expireTime;
}
}
// 構(gòu)造函數(shù):指定最大容量和 TTL
public LruTtlCache(int maxCapacity, long ttlMillis) {
super(maxCapacity, 0.75f, true); // accessOrder=true(訪問(wèn)順序)
this.maxCapacity = maxCapacity;
this.ttlMillis = ttlMillis;
}
/**
* 重寫(xiě) removeEldestEntry:達(dá)到最大容量時(shí)刪除最久未使用的條目
*/
@Override
protected boolean removeEldestEntry(Map.Entry<K, CacheEntry<V>> eldest) {
// 先清理過(guò)期條目
if (eldest.getValue().isExpired()) {
return true;
}
// 未過(guò)期則判斷是否超容量
return size() > maxCapacity;
}
/**
* 存緩存:線程安全
*/
public void put(K key, V value) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);
lock.lock();
try {
// 計(jì)算過(guò)期時(shí)間戳
long expireTime = System.currentTimeMillis() + ttlMillis;
super.put(key, new CacheEntry<>(value, expireTime));
} finally {
lock.unlock();
}
}
/**
* 取緩存:線程安全,自動(dòng)過(guò)濾過(guò)期條目
*/
public V get(Object key) {
lock.lock();
try {
CacheEntry<V> entry = super.get(key);
if (entry == null) {
return null;
}
// 過(guò)期則刪除并返回 null
if (entry.isExpired()) {
super.remove(key);
return null;
}
return entry.value;
} finally {
lock.unlock();
}
}
/**
* 批量清理過(guò)期條目
*/
public void cleanExpired() {
lock.lock();
try {
keySet().removeIf(key -> super.get(key).isExpired());
} finally {
lock.unlock();
}
}
}2.3 多級(jí)緩存架構(gòu)設(shè)計(jì)(本地緩存+分布式緩存)
在微服務(wù)架構(gòu)中,單一本地緩存無(wú)法滿足分布式部署需求,需設(shè)計(jì)“本地緩存+分布式緩存”的多級(jí)架構(gòu):
2.3.1 架構(gòu)示意圖
應(yīng)用服務(wù) A 應(yīng)用服務(wù) B
┌───────────────┐ ┌───────────────┐
│ 本地緩存 │ │ 本地緩存 │
│ (LruTtlCache)│ │ (LruTtlCache)│
└───────┬───────┘ └───────┬───────┘
│ │
└───────────┬───────────┘
│
┌───────▼───────┐
│ 分布式緩存 │
│ (Redis) │
└───────┬───────┘
│
┌───────▼───────┐
│ 數(shù)據(jù)庫(kù) │
└───────────────┘2.3.2 核心邏輯
- 讀取數(shù)據(jù):優(yōu)先查本地緩存 → 本地未命中查分布式緩存 → 分布式未命中查數(shù)據(jù)庫(kù),查詢(xún)結(jié)果回寫(xiě)兩級(jí)緩存
- 更新數(shù)據(jù):更新數(shù)據(jù)庫(kù) → 淘汰分布式緩存 → 淘汰所有應(yīng)用節(jié)點(diǎn)的本地緩存(通過(guò)消息通知)
- 優(yōu)勢(shì):本地緩存提升響應(yīng)速度,分布式緩存保證分布式一致性,數(shù)據(jù)庫(kù)保證數(shù)據(jù)持久化
2.3.3 代碼簡(jiǎn)化實(shí)現(xiàn)(多級(jí)緩存客戶(hù)端)
import redis.clients.jedis.Jedis;
import java.util.function.Supplier;
/**
* 多級(jí)緩存客戶(hù)端(本地緩存+Redis)
*/
public class MultiLevelCache<K, V> {
private final LruTtlCache<K, V> localCache;
private final Jedis redisClient;
private final String prefix; // Redis 鍵前綴
private final long redisTtlSeconds; // Redis 過(guò)期時(shí)間(秒)
// 構(gòu)造函數(shù):初始化各級(jí)緩存
public MultiLevelCache(int localMaxCapacity, long localTtlMillis,
Jedis redisClient, String prefix, long redisTtlSeconds) {
this.localCache = new LruTtlCache<>(localMaxCapacity, localTtlMillis);
this.redisClient = redisClient;
this.prefix = prefix;
this.redisTtlSeconds = redisTtlSeconds;
}
/**
* 讀取緩存:自動(dòng)降級(jí)查詢(xún)
* @param key 緩存鍵
* @param loader 數(shù)據(jù)庫(kù)加載器(緩存未命中時(shí)執(zhí)行)
* @return 緩存值
*/
public V get(K key, Supplier<V> loader) {
// 1. 查本地緩存
V value = localCache.get(key);
if (value != null) {
return value;
}
// 2. 查 Redis 緩存
String redisKey = prefix + key;
String redisValue = redisClient.get(redisKey);
if (redisValue != null) {
V deserialized = deserialize(redisValue); // 反序列化
localCache.put(key, deserialized); // 回寫(xiě)本地緩存
return deserialized;
}
// 3. 查數(shù)據(jù)庫(kù)并回寫(xiě)緩存
value = loader.get();
if (value != null) {
localCache.put(key, value);
redisClient.setex(redisKey, redisTtlSeconds, serialize(value)); // 序列化并設(shè)置過(guò)期時(shí)間
}
return value;
}
/**
* 淘汰緩存:更新數(shù)據(jù)時(shí)調(diào)用
*/
public void evict(K key) {
localCache.remove(key); // 淘汰本地緩存
redisClient.del(prefix + key); // 淘汰 Redis 緩存
// 發(fā)送消息通知其他節(jié)點(diǎn)淘汰本地緩存(如 RocketMQ/Kafka)
sendEvictMessage(key);
}
// 序列化/反序列化(簡(jiǎn)化實(shí)現(xiàn),實(shí)際可使用 Jackson)
private String serialize(V value) {
return value.toString();
}
private V deserialize(String value) {
return (V) value;
}
// 發(fā)送緩存淘汰消息(簡(jiǎn)化實(shí)現(xiàn))
private void sendEvictMessage(K key) {
System.out.println("發(fā)送緩存淘汰消息:" + key);
}
}2.4 緩存架構(gòu)優(yōu)化要點(diǎn)
- 本地緩存:使用 LruTtlCache 控制內(nèi)存,定期清理過(guò)期數(shù)據(jù),避免內(nèi)存泄漏
- 分布式緩存:選擇 Redis 等高性能組件,設(shè)置合理 TTL,避免緩存雪崩
- 一致性保障:更新數(shù)據(jù)時(shí)采用“更新數(shù)據(jù)庫(kù)→淘汰緩存”順序,結(jié)合消息通知實(shí)現(xiàn)分布式緩存一致性
- 降級(jí)策略:分布式緩存不可用時(shí),僅依賴(lài)本地緩存+數(shù)據(jù)庫(kù),保證系統(tǒng)可用性
三、集合與設(shè)計(jì)模式的深度融合實(shí)踐
Java 集合框架本身大量運(yùn)用設(shè)計(jì)模式(如迭代器模式、裝飾器模式),在實(shí)際開(kāi)發(fā)中,結(jié)合設(shè)計(jì)模式使用集合可大幅提升代碼的擴(kuò)展性和可維護(hù)性。
3.1 裝飾器模式+集合:增強(qiáng)集合功能
裝飾器模式通過(guò)包裝原集合,在不修改原代碼的前提下增強(qiáng)功能(如日志記錄、權(quán)限控制、數(shù)據(jù)校驗(yàn))。以下實(shí)現(xiàn)一個(gè)“帶訪問(wèn)日志的 List”:
import java.util.*;
/**
* 裝飾器模式:帶訪問(wèn)日志的 List
*/
public class LoggingList<E> implements List<E> {
// 被裝飾的原 List
private final List<E> target;
// 日志記錄器(簡(jiǎn)化實(shí)現(xiàn))
private final Logger logger = new Logger();
// 構(gòu)造函數(shù):傳入原 List
public LoggingList(List<E> target) {
this.target = Objects.requireNonNull(target);
}
/**
* 增強(qiáng) add 方法:記錄添加日志
*/
@Override
public boolean add(E e) {
logger.log("添加元素:" + e);
return target.add(e);
}
/**
* 增強(qiáng) get 方法:記錄訪問(wèn)日志
*/
@Override
public E get(int index) {
E e = target.get(index);
logger.log("訪問(wèn)索引 " + index + " 的元素:" + e);
return e;
}
/**
* 其他方法直接委托給原 List(省略重復(fù)代碼,實(shí)際開(kāi)發(fā)可通過(guò) IDE 自動(dòng)生成)
*/
@Override
public int size() {
return target.size();
}
@Override
public boolean isEmpty() {
return target.isEmpty();
}
// ... 其他 List 接口方法(均委托給 target)
/**
* 簡(jiǎn)化日志類(lèi)
*/
private static class Logger {
public void log(String message) {
System.out.println("[LoggingList] " + message);
}
}
// 測(cè)試
public static void main(String[] args) {
List<String> list = new LoggingList<>(new ArrayList<>());
list.add("Java");
list.add("集合");
list.get(0);
// 輸出:
// [LoggingList] 添加元素:Java
// [LoggingList] 添加元素:集合
// [LoggingList] 訪問(wèn)索引 0 的元素:Java
}
}3.2 享元模式+集合:復(fù)用重復(fù)對(duì)象
享元模式通過(guò)緩存重復(fù)對(duì)象減少內(nèi)存占用,適用于大量相似對(duì)象場(chǎng)景(如配置項(xiàng)、常量、商品規(guī)格)。以下基于 HashMap 實(shí)現(xiàn)享元池:
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* 享元模式:商品規(guī)格享元池(復(fù)用重復(fù)的規(guī)格對(duì)象)
*/
public class SpecificationFlyweightPool {
// 享元池:緩存規(guī)格對(duì)象(key=規(guī)格編碼,value=規(guī)格對(duì)象)
private static final Map<String, Specification> POOL = new HashMap<>();
/**
* 獲取規(guī)格對(duì)象:存在則復(fù)用,不存在則創(chuàng)建并緩存
*/
public static Specification getSpecification(String color, String size) {
Objects.requireNonNull(color);
Objects.requireNonNull(size);
// 生成唯一編碼(作為享元池 key)
String key = color + "_" + size;
// 雙重檢查鎖定(DCL)保證線程安全
if (!POOL.containsKey(key)) {
synchronized (SpecificationFlyweightPool.class) {
if (!POOL.containsKey(key)) {
POOL.put(key, new Specification(color, size));
}
}
}
return POOL.get(key);
}
/**
* 商品規(guī)格類(lèi)(不可變,確保享元安全復(fù)用)
*/
public static class Specification {
private final String color;
private final String size;
private Specification(String color, String size) {
this.color = color;
this.size = size;
}
// getter(無(wú) setter,確保不可變)
public String getColor() {
return color;
}
public String getSize() {
return size;
}
// 重寫(xiě) equals 和 hashCode(確保 key 唯一性)
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Specification that = (Specification) o;
return Objects.equals(color, that.color) && Objects.equals(size, that.size);
}
@Override
public int hashCode() {
return Objects.hash(color, size);
}
}
// 測(cè)試:復(fù)用對(duì)象
public static void main(String[] args) {
Specification spec1 = getSpecification("紅色", "M");
Specification spec2 = getSpecification("紅色", "M");
System.out.println(spec1 == spec2); // true(復(fù)用同一對(duì)象)
}
}3.3 迭代器模式+集合:自定義遍歷邏輯
迭代器模式隔離集合的存儲(chǔ)結(jié)構(gòu)與遍歷邏輯,支持自定義遍歷規(guī)則(如過(guò)濾、分頁(yè)、排序)。以下實(shí)現(xiàn)一個(gè)“分頁(yè)迭代器”:
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
/**
* 迭代器模式:分頁(yè)迭代器(按頁(yè)遍歷集合)
*/
public class PagingIterator<E> implements Iterator<List<E>> {
private final List<E> source; // 源集合
private final int pageSize; // 每頁(yè)大小
private int currentPage; // 當(dāng)前頁(yè)碼(從 0 開(kāi)始)
private final int totalPages; // 總頁(yè)數(shù)
/**
* 構(gòu)造函數(shù):傳入源集合和每頁(yè)大小
*/
public PagingIterator(List<E> source, int pageSize) {
this.source = Objects.requireNonNull(source);
if (pageSize <= 0) {
throw new IllegalArgumentException("每頁(yè)大小必須大于 0");
}
this.pageSize = pageSize;
this.currentPage = 0;
// 計(jì)算總頁(yè)數(shù)
this.totalPages = (source.size() + pageSize - 1) / pageSize;
}
/**
* 是否還有下一頁(yè)
*/
@Override
public boolean hasNext() {
return currentPage < totalPages;
}
/**
* 獲取下一頁(yè)數(shù)據(jù)
*/
@Override
public List<E> next() {
if (!hasNext()) {
throw new NoSuchElementException("沒(méi)有更多頁(yè)面");
}
// 計(jì)算當(dāng)前頁(yè)的起始索引和結(jié)束索引
int start = currentPage * pageSize;
int end = Math.min(start + pageSize, source.size());
List<E> pageData = source.subList(start, end);
currentPage++;
return pageData;
}
// 測(cè)試:分頁(yè)遍歷 10 條數(shù)據(jù)(每頁(yè) 3 條)
public static void main(String[] args) {
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
PagingIterator<Integer> iterator = new PagingIterator<>(list, 3);
int pageNum = 1;
while (iterator.hasNext()) {
List<Integer> page = iterator.next();
System.out.println("第 " + pageNum + " 頁(yè):" + page);
pageNum++;
}
// 輸出:
// 第 1 頁(yè):[1, 2, 3]
// 第 2 頁(yè):[4, 5, 6]
// 第 3 頁(yè):[7, 8, 9]
// 第 4 頁(yè):[10]
}
}四、大數(shù)據(jù)量下集合的分片存儲(chǔ)與并行處理
當(dāng)數(shù)據(jù)量達(dá)到百萬(wàn)、千萬(wàn)級(jí)別時(shí),單一集合會(huì)面臨內(nèi)存溢出、遍歷效率低等問(wèn)題,需采用“分片存儲(chǔ)+并行處理”架構(gòu),利用多線程和分布式資源提升處理能力。
4.1 集合分片存儲(chǔ)設(shè)計(jì)(基于 List 的分片)
將大數(shù)據(jù)量集合拆分為多個(gè)小分片(Shard),每個(gè)分片存儲(chǔ)部分?jǐn)?shù)據(jù),便于并行處理和內(nèi)存控制:
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* 集合分片工具類(lèi):將大 List 拆分為多個(gè)小分片
*/
public class ListSharder<T> {
private final List<T> source; // 源集合
private final int shardSize; // 每個(gè)分片的最大大小
public ListSharder(List<T> source, int shardSize) {
this.source = Objects.requireNonNull(source);
if (shardSize <= 0) {
throw new IllegalArgumentException("分片大小必須大于 0");
}
this.shardSize = shardSize;
}
/**
* 執(zhí)行分片:返回分片列表
*/
public List<List<T>> shard() {
List<List<T>> shards = new ArrayList<>();
int totalSize = source.size();
if (totalSize == 0) {
return shards;
}
// 計(jì)算分片數(shù)量
int shardCount = (totalSize + shardSize - 1) / shardSize;
for (int i = 0; i < shardCount; i++) {
// 計(jì)算當(dāng)前分片的起始和結(jié)束索引
int start = i * shardSize;
int end = Math.min(start + shardSize, totalSize);
// 截取分片并添加到結(jié)果集(ArrayList 是線程不安全的,并行處理時(shí)需注意)
List<T> shard = new ArrayList<>(source.subList(start, end));
shards.add(shard);
}
return shards;
}
// 測(cè)試:將 100 萬(wàn)條數(shù)據(jù)拆分為每個(gè)分片 1 萬(wàn)條
public static void main(String[] args) {
// 生成 100 萬(wàn)條測(cè)試數(shù)據(jù)
List<Integer> bigList = new ArrayList<>(1_000_000);
for (int i = 0; i < 1_000_000; i++) {
bigList.add(i);
}
// 分片(每個(gè)分片 10000 條)
ListSharder<Integer> sharder = new ListSharder<>(bigList, 10_000);
List<List<Integer>> shards = sharder.shard();
System.out.println("總數(shù)據(jù)量:" + bigList.size());
System.out.println("分片數(shù)量:" + shards.size());
System.out.println("每個(gè)分片大小:" + shards.get(0).size());
// 輸出:
// 總數(shù)據(jù)量:1000000
// 分片數(shù)量:100
// 每個(gè)分片大小:10000
}
}4.2 分片并行處理(基于線程池)
將分片分配給線程池并行處理,利用多核 CPU 資源提升處理效率,適用于數(shù)據(jù)篩選、統(tǒng)計(jì)、轉(zhuǎn)換等場(chǎng)景:
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 分片并行處理器:基于線程池處理分片數(shù)據(jù)
*/
public class ShardParallelProcessor<T, R> {
private final ExecutorService executor; // 線程池
private final int threadCount; // 線程數(shù)(默認(rèn) CPU 核心數(shù))
// 構(gòu)造函數(shù):默認(rèn)線程數(shù)為 CPU 核心數(shù)
public ShardParallelProcessor() {
this.threadCount = Runtime.getRuntime().availableProcessors();
this.executor = Executors.newFixedThreadPool(threadCount);
}
// 構(gòu)造函數(shù):自定義線程數(shù)
public ShardParallelProcessor(int threadCount) {
this.threadCount = threadCount;
this.executor = Executors.newFixedThreadPool(threadCount);
}
/**
* 并行處理分片:每個(gè)分片執(zhí)行 processor 邏輯
* @param shards 分片列表
* @param processor 分片處理邏輯(函數(shù)式接口)
* @return 合并后的處理結(jié)果
*/
public List<R> process(List<List<T>> shards, ShardProcessor<T, R> processor) {
try {
// 提交所有分片任務(wù),并行執(zhí)行
return shards.stream()
.map(shard -> executor.submit(() -> processor.process(shard)))
.map(future -> {
try {
return future.get(); // 獲取分片處理結(jié)果
} catch (Exception e) {
throw new RuntimeException("分片處理失敗", e);
}
})
.flatMap(List::stream) // 合并所有分片結(jié)果
.collect(Collectors.toList());
} finally {
// 關(guān)閉線程池
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.HOURS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
/**
* 分片處理函數(shù)式接口
*/
@FunctionalInterface
public interface ShardProcessor<T, R> {
List<R> process(List<T> shard);
}
// 測(cè)試:并行統(tǒng)計(jì)分片數(shù)據(jù)中偶數(shù)的個(gè)數(shù)
public static void main(String[] args) {
// 1. 生成 100 萬(wàn)條數(shù)據(jù)并分片
List<Integer> bigList = new ArrayList<>(1_000_000);
for (int i = 0; i < 1_000_000; i++) {
bigList.add(i);
}
ListSharder<Integer> sharder = new ListSharder<>(bigList, 10_000);
List<List<Integer>> shards = sharder.shard();
// 2. 并行處理:統(tǒng)計(jì)每個(gè)分片的偶數(shù)個(gè)數(shù)
ShardParallelProcessor<Integer, Integer> processor = new ShardParallelProcessor<>();
List<Integer> shardEvenCounts = processor.process(shards, shard -> {
// 每個(gè)分片統(tǒng)計(jì)偶數(shù)個(gè)數(shù)
long count = shard.stream().filter(num -> num % 2 == 0).count();
return List.of((int) count);
});
// 3. 合并結(jié)果
int totalEvenCount = shardEvenCounts.stream().mapToInt(Integer::intValue).sum();
System.out.println("100 萬(wàn)條數(shù)據(jù)中偶數(shù)的個(gè)數(shù):" + totalEvenCount); // 輸出:500000
}
}4.3 分布式分片處理(基于 MapReduce 思想)
當(dāng)數(shù)據(jù)量達(dá)到億級(jí)別時(shí),單機(jī)分片已無(wú)法滿足需求,需采用分布式分片架構(gòu)(如 Hadoop MapReduce、Spark),核心思想:
- 分片:將分布式存儲(chǔ)(如 HDFS、MySQL 分庫(kù)分表)中的數(shù)據(jù)拆分為多個(gè)分片
- 映射(Map):多個(gè)節(jié)點(diǎn)并行處理各自分片,輸出中間結(jié)果
- 歸約(Reduce):匯總所有節(jié)點(diǎn)的中間結(jié)果,得到最終結(jié)果
Java 集合可作為分布式分片的“本地處理單元”,配合分布式框架實(shí)現(xiàn)大規(guī)模數(shù)據(jù)處理。
五、分布式場(chǎng)景中集合的數(shù)據(jù)一致性與傳輸優(yōu)化
在分布式系統(tǒng)中,集合數(shù)據(jù)的傳輸和一致性維護(hù)是核心挑戰(zhàn),需解決“數(shù)據(jù)序列化、網(wǎng)絡(luò)傳輸效率、分布式一致性”三大問(wèn)題。
5.1 集合數(shù)據(jù)序列化優(yōu)化
集合數(shù)據(jù)在網(wǎng)絡(luò)中傳輸時(shí)需序列化,選擇高效的序列化框架可降低傳輸開(kāi)銷(xiāo),常用框架包括:
- Jackson:JSON 序列化,可讀性強(qiáng),適用于中小數(shù)據(jù)量
- Protostuff:二進(jìn)制序列化,效率高、體積小,適用于大數(shù)據(jù)量
- Kryo:高性能二進(jìn)制序列化,適用于分布式緩存、RPC 傳輸
以下是基于 Protostuff 的集合序列化工具類(lèi):
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.util.List;
import java.util.Map;
/**
* 集合序列化工具類(lèi)(基于 Protostuff)
*/
public class CollectionSerializer {
// 線程局部變量:避免緩沖區(qū)競(jìng)爭(zhēng)
private static final ThreadLocal<LinkedBuffer> BUFFER = ThreadLocal.withInitial(() -> LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
/**
* 序列化 List
*/
public static <T> byte[] serializeList(List<T> list, Class<T> clazz) {
if (list == null || list.isEmpty()) {
return new byte[0];
}
Schema<T> schema = RuntimeSchema.getSchema(clazz);
LinkedBuffer buffer = BUFFER.get();
try {
return ProtostuffIOUtil.toByteArray(list, schema, buffer);
} finally {
buffer.clear(); // 清空緩沖區(qū)
}
}
/**
* 反序列化 List
*/
public static <T> List<T> deserializeList(byte[] data, Class<T> clazz) {
if (data == null || data.length == 0) {
return List.of();
}
Schema<T> schema = RuntimeSchema.getSchema(clazz);
List<T> list = schema.newMessage().getClass().isAssignableFrom(List.class) ?
(List<T>) schema.newMessage() : new ArrayList<>();
ProtostuffIOUtil.mergeFrom(data, (T) list, schema);
return list;
}
/**
* 序列化 Map(簡(jiǎn)化實(shí)現(xiàn),實(shí)際需自定義 Schema)
*/
public static <K, V> byte[] serializeMap(Map<K, V> map, Class<K> keyClazz, Class<V> valueClazz) {
// 實(shí)際開(kāi)發(fā)中需為 Map 自定義 Schema,此處簡(jiǎn)化為 JSON 序列化(僅作示例)
try {
return new com.alibaba.fastjson.JSONObject().toJSONBytes(map);
} catch (Exception e) {
throw new RuntimeException("Map 序列化失敗", e);
}
}
/**
* 反序列化 Map
*/
public static <K, V> Map<K, V> deserializeMap(byte[] data, Class<K> keyClazz, Class<V> valueClazz) {
if (data == null || data.length == 0) {
return Map.of();
}
// 對(duì)應(yīng)序列化邏輯,簡(jiǎn)化為 JSON 反序列化
try {
return com.alibaba.fastjson.JSONObject.parseObject(data, new com.alibaba.fastjson.TypeReference<Map<K, V>>() {});
} catch (Exception e) {
throw new RuntimeException("Map 反序列化失敗", e);
}
}
}5.2 分布式集合數(shù)據(jù)一致性保障
分布式場(chǎng)景中,多個(gè)節(jié)點(diǎn)同時(shí)操作集合數(shù)據(jù)會(huì)導(dǎo)致一致性問(wèn)題,常用解決方案:
- 分布式鎖:通過(guò) Redis 分布式鎖、ZooKeeper 鎖等,保證同一時(shí)間只有一個(gè)節(jié)點(diǎn)修改數(shù)據(jù)
- 版本控制:為集合數(shù)據(jù)添加版本號(hào),修改時(shí)校驗(yàn)版本,避免覆蓋過(guò)期數(shù)據(jù)
- 最終一致性:基于消息隊(duì)列異步同步數(shù)據(jù),允許短時(shí)間不一致,最終達(dá)到一致
以下是基于 Redis 分布式鎖的集合修改示例:
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 分布式集合修改:基于 Redis 分布式鎖保證一致性
*/
public class DistributedListModifier<T> {
private final Jedis jedis;
private final String listKey; // Redis 中 List 的 key
private final String lockKey; // 分布式鎖 key
private final long lockTimeout; // 鎖超時(shí)時(shí)間(毫秒)
public DistributedListModifier(Jedis jedis, String listKey, long lockTimeout) {
this.jedis = jedis;
this.listKey = listKey;
this.lockKey = "lock:" + listKey;
this.lockTimeout = lockTimeout;
}
/**
* 向分布式 List 中添加元素(線程安全)
*/
public boolean addElement(T element) {
String lockValue = UUID.randomUUID().toString();
try {
// 1. 獲取分布式鎖
boolean locked = tryLock(lockValue);
if (!locked) {
return false; // 獲取鎖失敗,返回重試
}
// 2. 序列化元素并添加到 Redis List
byte[] data = CollectionSerializer.serializeList(List.of(element), (Class<T>) element.getClass());
jedis.rpush(listKey.getBytes(), data);
return true;
} finally {
// 3. 釋放鎖
releaseLock(lockValue);
}
}
/**
* 獲取分布式鎖
*/
private boolean tryLock(String lockValue) {
String result = jedis.set(lockKey, lockValue, "NX", "PX", lockTimeout);
return "OK".equals(result);
}
/**
* 釋放分布式鎖(防止誤釋放)
*/
private void releaseLock(String lockValue) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, List.of(lockKey), List.of(lockValue));
}
}5.3 集合傳輸優(yōu)化技巧
- 分片傳輸:大數(shù)據(jù)量集合拆分后分批傳輸,避免單次傳輸過(guò)大導(dǎo)致超時(shí)
- 壓縮傳輸:序列化后對(duì)數(shù)據(jù)進(jìn)行壓縮(如 GZIP),減少網(wǎng)絡(luò)帶寬占用
- 增量傳輸:僅傳輸新增/修改的元素,而非整個(gè)集合,降低傳輸開(kāi)銷(xiāo)
- 延遲加載:分布式場(chǎng)景中,先傳輸集合元數(shù)據(jù)(如大小、分片信息),按需加載具體數(shù)據(jù)
六、實(shí)戰(zhàn)案例:基于集合的規(guī)則引擎設(shè)計(jì)
規(guī)則引擎是業(yè)務(wù)系統(tǒng)的核心組件,用于動(dòng)態(tài)管理業(yè)務(wù)規(guī)則(如風(fēng)控規(guī)則、促銷(xiāo)規(guī)則),基于集合框架可實(shí)現(xiàn)輕量級(jí)規(guī)則引擎,支持規(guī)則的動(dòng)態(tài)添加、匹配和執(zhí)行。
6.1 需求分析
設(shè)計(jì)一個(gè)促銷(xiāo)規(guī)則引擎,支持以下功能:
- 動(dòng)態(tài)添加促銷(xiāo)規(guī)則(如滿減、折扣、贈(zèng)品)
- 根據(jù)訂單信息匹配符合條件的規(guī)則
- 執(zhí)行規(guī)則并返回促銷(xiāo)結(jié)果
- 支持規(guī)則優(yōu)先級(jí)(高優(yōu)先級(jí)規(guī)則先執(zhí)行)
6.2 設(shè)計(jì)思路
- 規(guī)則接口(PromotionRule):定義規(guī)則的匹配條件和執(zhí)行邏輯
- 規(guī)則引擎(RuleEngine):使用 TreeSet 存儲(chǔ)規(guī)則(按優(yōu)先級(jí)排序),提供規(guī)則添加、匹配、執(zhí)行方法
- 訂單上下文(OrderContext):封裝訂單信息,作為規(guī)則匹配和執(zhí)行的入?yún)?/li>
6.3 代碼實(shí)現(xiàn)
6.3.1 訂單上下文類(lèi)(OrderContext.java)
import java.math.BigDecimal;
import java.util.List;
import java.util.Objects;
/**
* 訂單上下文:封裝規(guī)則匹配所需的訂單信息
*/
public class OrderContext {
private final String orderId;
private final BigDecimal totalAmount; // 訂單總金額
private final List<String> productIds; // 商品 ID 列表
private final int memberLevel; // 會(huì)員等級(jí)
// 構(gòu)造函數(shù)
public OrderContext(String orderId, BigDecimal totalAmount, List<String> productIds, int memberLevel) {
this.orderId = Objects.requireNonNull(orderId);
this.totalAmount = Objects.requireNonNull(totalAmount);
this.productIds = Objects.requireNonNull(productIds);
this.memberLevel = memberLevel;
}
// getter
public String getOrderId() {
return orderId;
}
public BigDecimal getTotalAmount() {
return totalAmount;
}
public List<String> getProductIds() {
return productIds;
}
public int getMemberLevel() {
return memberLevel;
}
}6.3.2 促銷(xiāo)規(guī)則接口(PromotionRule.java)
/**
* 促銷(xiāo)規(guī)則接口:所有規(guī)則需實(shí)現(xiàn)此接口
*/
public interface PromotionRule {
/**
* 規(guī)則優(yōu)先級(jí):數(shù)值越大,優(yōu)先級(jí)越高
*/
int getPriority();
/**
* 規(guī)則匹配:判斷訂單是否滿足規(guī)則條件
*/
boolean match(OrderContext context);
/**
* 執(zhí)行規(guī)則:返回促銷(xiāo)結(jié)果
*/
PromotionResult execute(OrderContext context);
/**
* 促銷(xiāo)結(jié)果封裝
*/
record PromotionResult(String ruleName, BigDecimal discountAmount, String gift) {}
}6.3.3 具體規(guī)則實(shí)現(xiàn)(滿減規(guī)則、折扣規(guī)則)
import java.math.BigDecimal;
/**
* 滿減規(guī)則:滿 1000 減 200
*/
public class FullReduceRule implements PromotionRule {
@Override
public int getPriority() {
return 2; // 優(yōu)先級(jí):2
}
@Override
public boolean match(OrderContext context) {
// 訂單金額 ≥1000 元
return context.getTotalAmount().compareTo(new BigDecimal("1000")) >= 0;
}
@Override
public PromotionResult execute(OrderContext context) {
return new PromotionResult("滿1000減200", new BigDecimal("200"), "無(wú)");
}
}
/**
* 會(huì)員折扣規(guī)則:VIP 會(huì)員 9 折
*/
public class MemberDiscountRule implements PromotionRule {
@Override
public int getPriority() {
return 1; // 優(yōu)先級(jí):1(低于滿減規(guī)則)
}
@Override
public boolean match(OrderContext context) {
// 會(huì)員等級(jí) ≥3(VIP 會(huì)員)
return context.getMemberLevel() >= 3;
}
@Override
public PromotionResult execute(OrderContext context) {
BigDecimal discount = context.getTotalAmount().multiply(new BigDecimal("0.1"));
return new PromotionResult("VIP 9折", discount, "無(wú)");
}
}
/**
* 贈(zèng)品規(guī)則:購(gòu)買(mǎi)指定商品送贈(zèng)品
*/
public class GiftRule implements PromotionRule {
private final String targetProductId;
private final String gift;
public GiftRule(String targetProductId, String gift) {
this.targetProductId = targetProductId;
this.gift = gift;
}
@Override
public int getPriority() {
return 3; // 優(yōu)先級(jí):3(最高)
}
@Override
public boolean match(OrderContext context) {
// 包含指定商品
return context.getProductIds().contains(targetProductId);
}
@Override
public PromotionResult execute(OrderContext context) {
return new PromotionResult("購(gòu)買(mǎi)指定商品送贈(zèng)品", BigDecimal.ZERO, gift);
}
}6.3.4 規(guī)則引擎類(lèi)(RuleEngine.java)
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.TreeSet;
/**
* 規(guī)則引擎:基于 TreeSet 按優(yōu)先級(jí)管理規(guī)則
*/
public class RuleEngine {
// 存儲(chǔ)規(guī)則:TreeSet 按優(yōu)先級(jí)降序排序(優(yōu)先級(jí)越高越先執(zhí)行)
private final TreeSet<PromotionRule> rules = new TreeSet<>(Comparator.comparingInt(PromotionRule::getPriority).reversed());
/**
* 添加規(guī)則
*/
public void addRule(PromotionRule rule) {
rules.add(rule);
}
/**
* 匹配并執(zhí)行規(guī)則:返回所有符合條件的規(guī)則結(jié)果
*/
public List<PromotionResult> executeRules(OrderContext context) {
List<PromotionResult> results = new ArrayList<>();
for (PromotionRule rule : rules) {
if (rule.match(context)) {
results.add(rule.execute(context));
// 若為排他規(guī)則,可在此處 break(本案例支持多規(guī)則疊加)
}
}
return results;
}
// 測(cè)試
public static void main(String[] args) {
// 1. 初始化規(guī)則引擎并添加規(guī)則
RuleEngine engine = new RuleEngine();
engine.addRule(new FullReduceRule());
engine.addRule(new MemberDiscountRule());
engine.addRule(new GiftRule("P001", "保溫杯"));
// 2. 構(gòu)造訂單上下文(滿 1000 元、VIP 會(huì)員、包含 P001 商品)
OrderContext context = new OrderContext(
"ORDER001",
new BigDecimal("1500"),
List.of("P001", "P002"),
3
);
// 3. 執(zhí)行規(guī)則
List<PromotionResult> results = engine.executeRules(context);
// 4. 輸出結(jié)果(按優(yōu)先級(jí)排序:贈(zèng)品規(guī)則 → 滿減規(guī)則 → 折扣規(guī)則)
System.out.println("訂單 " + context.getOrderId() + " 匹配的促銷(xiāo)規(guī)則:");
for (PromotionResult result : results) {
System.out.printf("規(guī)則:%s,折扣金額:%s,贈(zèng)品:%s%n",
result.ruleName(), result.discountAmount(), result.gift());
}
}
}6.4 測(cè)試結(jié)果與案例總結(jié)
6.4.1 測(cè)試輸出
訂單 ORDER001 匹配的促銷(xiāo)規(guī)則:
規(guī)則:購(gòu)買(mǎi)指定商品送贈(zèng)品,折扣金額:0,贈(zèng)品:保溫杯
規(guī)則:滿1000減200,折扣金額:200,贈(zèng)品:無(wú)
規(guī)則:VIP 9折,折扣金額:150.00,贈(zèng)品:無(wú)
6.4.2 案例總結(jié)
? 核心技術(shù)亮點(diǎn):
- 集合選型:使用 TreeSet 存儲(chǔ)規(guī)則,自動(dòng)按優(yōu)先級(jí)排序,無(wú)需手動(dòng)維護(hù)順序
- 接口抽象:定義 PromotionRule 接口,支持規(guī)則擴(kuò)展(新增規(guī)則無(wú)需修改引擎代碼)
- 靈活性:支持多規(guī)則疊加執(zhí)行,可通過(guò)修改比較器或添加排他標(biāo)記支持排他規(guī)則
- 可擴(kuò)展性:規(guī)則可動(dòng)態(tài)添加/刪除,適用于業(yè)務(wù)規(guī)則頻繁變化的場(chǎng)景
? 擴(kuò)展方向:
- 規(guī)則持久化:將規(guī)則存儲(chǔ)到數(shù)據(jù)庫(kù),支持動(dòng)態(tài)配置和熱更新
- 規(guī)則條件表達(dá)式:使用 EL 表達(dá)式或腳本語(yǔ)言(如 Groovy)定義規(guī)則條件,更靈活
- 規(guī)則執(zhí)行監(jiān)控:記錄規(guī)則執(zhí)行日志和結(jié)果,便于排查問(wèn)題和優(yōu)化規(guī)則
七、本章小結(jié)
本章聚焦 Java 集合框架的高級(jí)應(yīng)用與架構(gòu)設(shè)計(jì),從緩存架構(gòu)、設(shè)計(jì)模式融合、大數(shù)據(jù)量處理、分布式適配到業(yè)務(wù)組件設(shè)計(jì),全面覆蓋復(fù)雜場(chǎng)景的實(shí)踐方案,核心要點(diǎn)回顧如下:
- 緩存架構(gòu):基于 LinkedHashMap 實(shí)現(xiàn) LRU+TTL 本地緩存,結(jié)合分布式緩存設(shè)計(jì)多級(jí)緩存架構(gòu),兼顧性能和一致性。
- 設(shè)計(jì)模式融合:裝飾器模式增強(qiáng)集合功能,享元模式復(fù)用重復(fù)對(duì)象,迭代器模式自定義遍歷邏輯,提升代碼擴(kuò)展性。
- 大數(shù)據(jù)量處理:通過(guò)集合分片存儲(chǔ)控制內(nèi)存,結(jié)合線程池并行處理提升效率,分布式場(chǎng)景下可擴(kuò)展為 MapReduce 架構(gòu)。
- 分布式適配:優(yōu)化集合序列化和傳輸效率,基于分布式鎖保證數(shù)據(jù)一致性,適用于微服務(wù)和分布式系統(tǒng)。
- 業(yè)務(wù)組件設(shè)計(jì):基于集合框架實(shí)現(xiàn)輕量級(jí)規(guī)則引擎,支持規(guī)則動(dòng)態(tài)管理和靈活擴(kuò)展,可復(fù)用至各類(lèi)業(yè)務(wù)系統(tǒng)。
通過(guò)本章學(xué)習(xí),讀者應(yīng)能跳出“基礎(chǔ) API 使用”的層面,站在架構(gòu)設(shè)計(jì)的角度運(yùn)用集合框架,解決高并發(fā)、大數(shù)據(jù)量、分布式等復(fù)雜場(chǎng)景的問(wèn)題,設(shè)計(jì)出高性能、可擴(kuò)展、易維護(hù)的 Java 應(yīng)用。集合框架作為 Java 開(kāi)發(fā)的基礎(chǔ)工具,其靈活運(yùn)用是高級(jí)開(kāi)發(fā)工程師的核心能力之一,后續(xù)可結(jié)合具體業(yè)務(wù)場(chǎng)景持續(xù)深化實(shí)踐。
到此這篇關(guān)于Java 集合框架高級(jí)應(yīng)用與架構(gòu)設(shè)計(jì)方案的文章就介紹到這了,更多相關(guān)Java 集合框架實(shí)戰(zhàn)應(yīng)用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Spring Data MongoDB進(jìn)行地理位置相關(guān)查詢(xún)的步驟和示例
SpringData MongoDB是SpringData技術(shù)封裝了mongodb-driver技術(shù)之后的產(chǎn)物,它可以用更加簡(jiǎn)單的方式操作MongoDB,本文給大家介紹了如何使用Spring Data MongoDB進(jìn)行地理位置相關(guān)查詢(xún)的步驟和示例,需要的朋友可以參考下2025-05-05
springcloud-feign調(diào)用報(bào)錯(cuò)問(wèn)題
這篇文章主要介紹了springcloud-feign調(diào)用報(bào)錯(cuò)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
SpringBoot+thymeleaf+Echarts+Mysql 實(shí)現(xiàn)數(shù)據(jù)可視化讀取的示例
本文主要介紹了SpringBoot+thymeleaf+Echarts+Mysql 實(shí)現(xiàn)數(shù)據(jù)可視化讀取的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
nacos將服務(wù)注冊(cè)到不同的命名空間下問(wèn)題
Nacos是SpringCloudAlibaba架構(gòu)中最重要的組件,提供注冊(cè)中心、配置中心和動(dòng)態(tài)DNS服務(wù)三大功能,如果需要配置多個(gè)數(shù)據(jù)庫(kù)適配的環(huán)境,啟動(dòng)服務(wù)時(shí)需要將服務(wù)注冊(cè)到不同的命名空間下,并配置新部署的網(wǎng)關(guān)服務(wù)ip和端口或者域名2024-12-12
java如何用Processing生成馬賽克風(fēng)格的圖像
這篇文章主要介紹了如何用java如何用Processing生成馬賽克風(fēng)格的圖像,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03
IDEA插件開(kāi)發(fā)之環(huán)境搭建過(guò)程圖文詳解
這篇文章主要介紹了IDEA插件開(kāi)發(fā)之環(huán)境搭建過(guò)程,本文通過(guò)圖文并茂實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05

