【Spring AI实战】第12章 SpringAI 生产级调优
·
1. Token 优化:上下文裁剪、精简Prompt、切片优化
Spring AI 中 Token 优化是控制成本、提升性能的关键。以下是针对上下文裁剪、Prompt 精简和切片优化的系统化方案:
一、上下文裁剪策略
1. 自动上下文窗口管理
@Component
public class ContextWindowManager {
private final int MAX_TOKENS = 8000; // 根据模型调整
public String trimContext(String context, int targetTokens) {
// 基于Token计数器的裁剪
TokenCountEstimator estimator = new TokenCountEstimator();
int currentTokens = estimator.estimate(context);
if (currentTokens <= targetTokens) {
return context;
}
// 智能裁剪:保留关键部分
return intelligentTrimming(context, targetTokens);
}
private String intelligentTrimming(String text, int targetTokens) {
// 1. 提取摘要
String summary = extractSummary(text, targetTokens/4);
// 2. 保留开头和结尾(通常包含重要信息)
String[] sentences = text.split("[.!?]");
int keepEach = targetTokens / (sentences.length * 2);
// 3. 使用滑动窗口保留核心内容
return slidingWindowTrim(text, targetTokens);
}
}
2. 基于重要性的内容过滤
public class ContentPrioritizer {
// 使用TF-IDF或嵌入相似度评估重要性
public List<String> prioritizeSections(String document,
String query,
int maxSections) {
List<TextSection> sections = splitIntoSections(document);
// 计算每个部分与查询的相关性
sections.forEach(section -> {
section.setRelevanceScore(
calculateRelevance(section.getContent(), query)
);
});
// 按相关性排序并选择Top-K
return sections.stream()
.sorted(Comparator.reverseOrder())
.limit(maxSections)
.map(TextSection::getContent)
.collect(Collectors.toList());
}
}
二、Prompt 精简优化
1. 结构化Prompt模板
@Component
public class OptimizedPromptTemplate {
public String createConcisePrompt(String userQuery,
String context,
Map<String, String> variables) {
return """
# 指令(简洁)
%s
# 上下文(摘要)
%s
# 要求
1. 直接回答核心问题
2. 避免重复上下文
3. 使用要点格式
# 变量
%s
""".formatted(
truncateToTokens(userQuery, 200),
summarizeContext(context, 500),
formatVariables(variables)
);
}
private String summarizeContext(String context, int tokenLimit) {
// 提取关键句子
return KeySentenceExtractor.extract(context, tokenLimit);
}
}
2. 动态Prompt构建
public class DynamicPromptBuilder {
public Prompt buildAdaptivePrompt(ChatRequest request) {
PromptTemplate template;
switch (request.getComplexity()) {
case SIMPLE:
template = new SimpleQATemplate(); // 50-100 tokens
break;
case COMPLEX:
template = new AnalyticalTemplate(); // 200-300 tokens
break;
default:
template = new BalancedTemplate();
}
// 根据Token预算调整
return adjustForTokenBudget(template, request.getTokenBudget());
}
}
三、切片优化策略
1. 智能文档分块
@Configuration
public class ChunkingConfig {
@Bean
public TextSplitter semanticSplitter() {
return new TokenTextSplitter.Builder()
.setChunkSize(1000) // Token数
.setChunkOverlap(100) // 重叠Token
.setSeparator("\n\n") // 按段落分割
.setKeepSeparator(true)
.build();
}
@Bean
public TextSplitter hierarchicalSplitter() {
// 层次化分割:章节→段落→句子
return new RecursiveCharacterTextSplitter(
2000, // 大块
200, // 重叠
List.of("\n## ", "\n\n", "\n", " "), // 分割符优先级
true
);
}
}
2. 语义感知切片
@Service
public class SemanticChunkingService {
public List<DocumentChunk> chunkWithSemanticBoundaries(String text) {
List<DocumentChunk> chunks = new ArrayList<>();
// 1. 检测主题边界
List<Integer> boundaries = detectTopicBoundaries(text);
// 2. 在主题边界内进行Token级别的分块
for (int i = 0; i < boundaries.size() - 1; i++) {
String segment = text.substring(boundaries.get(i), boundaries.get(i + 1));
if (estimateTokens(segment) > MAX_CHUNK_TOKENS) {
// 递归分割
chunks.addAll(splitBySentences(segment));
} else {
chunks.add(new DocumentChunk(segment));
}
}
return chunks;
}
private List<Integer> detectTopicBoundaries(String text) {
// 使用以下策略检测边界:
// - 标题(#、##)
// - 段落长度突变
// - 连接词(然而、另外、总之)
// - 嵌入相似度变化
return topicDetectionAlgorithm(text);
}
}
四、Token使用监控与优化
1. Token消耗监控器
@Component
@Slf4j
public class TokenUsageMonitor {
@Autowired
private TokenCountEstimator tokenEstimator;
private final Map<String, TokenStats> usageStats = new ConcurrentHashMap<>();
@EventListener
public void handleChatEvent(ChatCompletionEvent event) {
int promptTokens = tokenEstimator.estimate(event.getPrompt());
int completionTokens = tokenEstimator.estimate(event.getResponse());
TokenStats stats = new TokenStats(
promptTokens,
completionTokens,
LocalDateTime.now()
);
usageStats.put(event.getSessionId(), stats);
// 预警机制
if (promptTokens > WARNING_THRESHOLD) {
log.warn("High token usage detected: {}", promptTokens);
suggestOptimizations(event.getPrompt());
}
}
public void suggestOptimizations(String prompt) {
List<String> suggestions = new ArrayList<>();
if (hasRedundantText(prompt)) {
suggestions.add("移除重复内容");
}
if (hasLongExamples(prompt)) {
suggestions.add("缩短示例或使用摘要");
}
if (hasUnnecessaryFormatting(prompt)) {
suggestions.add("简化格式标记");
}
// 提供优化后的Prompt版本
String optimized = applyOptimizations(prompt, suggestions);
}
}
2. 自适应Token预算分配
@Service
public class AdaptiveTokenBudget {
public TokenBudget allocateBudget(ChatRequest request) {
int totalBudget = getModelLimit(request.getModel());
// 动态分配策略
int contextBudget = (int) (totalBudget * 0.6); // 60%给上下文
int promptBudget = (int) (totalBudget * 0.2); // 20%给Prompt
int responseBudget = totalBudget - contextBudget - promptBudget;
// 根据查询复杂度调整
if (isComplexQuery(request.getQuery())) {
contextBudget *= 0.8; // 减少上下文,增加Prompt预算
promptBudget *= 1.2;
}
return new TokenBudget(contextBudget, promptBudget, responseBudget);
}
}
五、实用工具类
1. Token估算器
@Component
public class TokenCountEstimator {
// 近似估算(比调用API快)
public int estimateTokens(String text) {
// 简单估算:英文~4字符/Token,中文~2字符/Token
int chineseChars = countChineseCharacters(text);
int otherChars = text.length() - chineseChars;
return (int) (chineseChars / 2.0 + otherChars / 4.0);
}
// 精确计算(需要调用模型API)
public int calculateExactTokens(String text, String model) {
// 调用模型的Token计数API
return aiClient.countTokens(text, model);
}
}
2. Prompt压缩器
public class PromptCompressor {
public String compressPrompt(String originalPrompt) {
// 1. 移除多余空格和换行
String compressed = originalPrompt
.replaceAll("\\s+", " ")
.replaceAll("\n{3,}", "\n\n");
// 2. 缩写常见短语
compressed = abbreviateCommonPhrases(compressed);
// 3. 使用占位符替换重复内容
compressed = replaceDuplicatesWithReferences(compressed);
return compressed;
}
private String abbreviateCommonPhrases(String text) {
Map<String, String> abbreviations = Map.of(
"for example", "e.g.",
"that is", "i.e.",
"please", "pls",
"important", "imp"
);
for (Map.Entry<String, String> entry : abbreviations.entrySet()) {
text = text.replace(entry.getKey(), entry.getValue());
}
return text;
}
}
六、配置建议
application.yml 配置示例:
spring:
ai:
token:
optimization:
enabled: true
max-context-tokens: 4000
max-prompt-tokens: 1000
chunk:
size: 1000
overlap: 100
strategy: semantic
compression:
enabled: true
min-savings-percent: 20
monitoring:
token-usage:
enabled: true
warning-threshold: 8000
alert-threshold: 12000
最佳实践总结
- 分层裁剪策略 :
- 第一层:移除无关内容
- 第二层:提取关键信息
- 第三层:智能摘要
- Prompt设计原则 :
- 指令先行,上下文后置
- 使用结构化模板
- 避免冗余描述
- 切片优化 :
- 按语义边界分块
- 保持上下文连贯性
- 动态调整块大小
- 监控与调优 :
- 实时监控Token消耗
- A/B测试不同策略
- 根据业务需求调整参数
这些策略可以显著减少Token使用,同时保持模型性能。建议根据具体应用场景调整参数,并进行持续的监控和优化。
2. 接口性能优化:异步、线程池、缓存策略
Spring AI 应用性能优化可以从异步处理、线程池配置和缓存策略三个方面入手。以下是详细的优化方案:
一、异步处理优化
异步调用 AI 接口
@Service
public class AIService {
@Async("aiTaskExecutor")
public CompletableFuture<String> generateContentAsync(String prompt) {
String result = chatClient.call(prompt);
return CompletableFuture.completedFuture(result);
}
// 批量异步处理
@Async("aiTaskExecutor")
public CompletableFuture<List<String>> batchProcessAsync(List<String> prompts) {
List<CompletableFuture<String>> futures = prompts.stream()
.map(prompt -> CompletableFuture.supplyAsync(() ->
chatClient.call(prompt), aiTaskExecutor))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
}
WebFlux 响应式支持
@RestController
public class AIController {
@GetMapping("/ai/stream")
public Flux<String> streamResponse(@RequestParam String prompt) {
return chatClient.stream(prompt)
.map(Generation::getText)
.delayElements(Duration.ofMillis(50)); // 控制流式输出速度
}
@PostMapping("/ai/batch")
public Mono<ResponseEntity<BatchResult>> batchProcess(
@RequestBody BatchRequest request) {
return Flux.fromIterable(request.getPrompts())
.flatMap(prompt ->
Mono.fromCallable(() -> chatClient.call(prompt))
.subscribeOn(Schedulers.boundedElastic())
)
.collectList()
.map(results -> ResponseEntity.ok(new BatchResult(results)));
}
}
二、线程池优化配置
AI 专用线程池配置
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("aiTaskExecutor")
public ThreadPoolTaskExecutor aiTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数 = CPU核心数 * 2
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 最大线程数根据 AI 接口响应时间调整
executor.setMaxPoolSize(50);
// 队列容量避免过大导致内存溢出
executor.setQueueCapacity(100);
// 线程空闲时间
executor.setKeepAliveSeconds(60);
// 线程名前缀
executor.setThreadNamePrefix("ai-executor-");
// 拒绝策略:调用者运行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务完成后关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Bean("ioTaskExecutor")
public ThreadPoolTaskExecutor ioTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// I/O密集型任务可以设置更多线程
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("io-executor-");
executor.initialize();
return executor;
}
}
虚拟线程支持(Java 21+)
@Configuration
public class VirtualThreadConfig {
@Bean
public TaskExecutor virtualThreadExecutor() {
return TaskExecutorAdapter.ofVirtualThreadExecutor("ai-virtual-");
}
@Bean
@Qualifier("virtualThreadTaskExecutor")
public AsyncTaskExecutor virtualThreadTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
}
三、缓存策略优化
多级缓存配置
@Configuration
@EnableCaching
public class CacheConfig {
// 本地缓存(Caffeine)
@Bean
public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(Caffeine.newBuilder()
.maximumSize(1000) // 最大缓存条目数
.expireAfterWrite(10, TimeUnit.MINUTES) // 写入后过期时间
.expireAfterAccess(5, TimeUnit.MINUTES) // 访问后过期时间
.recordStats() // 记录缓存统计
);
return cacheManager;
}
// Redis 分布式缓存
@Bean
public RedisCacheManager redisCacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30))
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()))
.disableCachingNullValues();
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.withCacheConfiguration("ai_responses",
RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofHours(1)))
.transactionAware()
.build();
}
}
AI 响应缓存服务
@Service
public class AICacheService {
@Cacheable(value = "ai_responses",
key = "#prompt.hashCode()",
unless = "#result == null")
public String getCachedResponse(String prompt, String model) {
// 实际调用 AI 接口
return chatClient.call(prompt);
}
// 带参数的缓存
@Cacheable(value = "ai_responses",
key = "T(com.example.util.CacheKeyGenerator)"
+ ".generate(#prompt, #temperature, #maxTokens)")
public String getCachedResponseWithParams(String prompt,
double temperature,
int maxTokens) {
// 构建带参数的请求
return chatClient.call(prompt);
}
// 批量缓存
@Cacheable(value = "ai_batch_responses",
key = "#prompts.hashCode()")
public List<String> batchGetCachedResponses(List<String> prompts) {
return prompts.stream()
.map(this::getCachedResponse)
.collect(Collectors.toList());
}
}
向量相似度缓存
@Service
public class VectorCacheService {
private final Map<String, List<Double>> vectorCache =
new ConcurrentHashMap<>();
@Cacheable(value = "embeddings",
key = "#text.hashCode()")
public List<Double> getCachedEmbedding(String text) {
// 计算或获取向量
return embeddingClient.embed(text);
}
// 相似度结果缓存
@Cacheable(value = "similarity_scores",
key = "#query.hashCode() + '_' + #document.hashCode()")
public double getCachedSimilarity(String query, String document) {
List<Double> queryVector = getCachedEmbedding(query);
List<Double> docVector = getCachedEmbedding(document);
return calculateCosineSimilarity(queryVector, docVector);
}
}
四、综合优化策略
请求合并与批处理
@Component
public class AIRequestBatcher {
private final BatchProcessor batchProcessor;
private final LinkedBlockingQueue<Request> requestQueue =
new LinkedBlockingQueue<>(1000);
@PostConstruct
public void init() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(this::processBatch, 100, 100, TimeUnit.MILLISECONDS);
}
private void processBatch() {
List<Request> batch = new ArrayList<>();
requestQueue.drainTo(batch, 50); // 每次最多处理50个
if (!batch.isEmpty()) {
List<String> prompts = batch.stream()
.map(Request::getPrompt)
.collect(Collectors.toList());
List<String> responses = batchProcessor.processBatch(prompts);
// 分发结果
for (int i = 0; i < batch.size(); i++) {
batch.get(i).getFuture().complete(responses.get(i));
}
}
}
public CompletableFuture<String> submitRequest(String prompt) {
CompletableFuture<String> future = new CompletableFuture<>();
requestQueue.offer(new Request(prompt, future));
return future;
}
@Data
@AllArgsConstructor
private static class Request {
private String prompt;
private CompletableFuture<String> future;
}
}
限流与熔断
@Configuration
public class ResilienceConfig {
@Bean
public CircuitBreaker aiCircuitBreaker() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 失败率阈值
.waitDurationInOpenState(Duration.ofSeconds(30)) // 熔断时间
.slidingWindowSize(10) // 滑动窗口大小
.permittedNumberOfCallsInHalfOpenState(5) // 半开状态允许的调用数
.build();
return CircuitBreaker.of("aiService", config);
}
@Bean
public RateLimiter aiRateLimiter() {
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(10) // 每秒限制
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofMillis(500)) // 等待超时
.build();
return RateLimiter.of("aiService", config);
}
@Bean
public Bulkhead aiBulkhead() {
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(20) // 最大并发数
.maxWaitDuration(Duration.ofMillis(100))
.build();
return Bulkhead.of("aiService", config);
}
}
监控与指标
@Component
public class AIMetricsCollector {
private final MeterRegistry meterRegistry;
private final Timer aiCallTimer;
public AIMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.aiCallTimer = Timer.builder("ai.api.call.duration")
.description("AI API调用耗时")
.register(meterRegistry);
}
public String callWithMetrics(String prompt) {
return aiCallTimer.record(() -> {
// 记录并发数
meterRegistry.gauge("ai.api.concurrent.calls",
new AtomicInteger(1),
AtomicInteger::getAndIncrement);
try {
String result = chatClient.call(prompt);
// 记录成功
meterRegistry.counter("ai.api.call.success").increment();
return result;
} catch (Exception e) {
// 记录失败
meterRegistry.counter("ai.api.call.failure").increment();
throw e;
} finally {
meterRegistry.gauge("ai.api.concurrent.calls",
new AtomicInteger(1),
AtomicInteger::getAndDecrement);
}
});
}
}
五、配置建议
application.yml 配置示例
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4
temperature: 0.7
connection-timeout: 30s
read-timeout: 60s
cache:
type: caffeine
caffeine:
spec: maximumSize=1000,expireAfterWrite=10m
redis:
time-to-live: 1h
task:
execution:
pool:
core-size: 8
max-size: 50
queue-capacity: 100
keep-alive: 60s
resilience4j:
circuitbreaker:
instances:
aiService:
failure-rate-threshold: 50
wait-duration-in-open-state: 30s
ratelimiter:
instances:
aiService:
limit-for-period: 10
limit-refresh-period: 1s
六、最佳实践总结
- 异步优先 :所有 AI 调用都应使用异步方式
- 线程池隔离 :AI 调用使用独立线程池,避免影响核心业务
- 多级缓存 :本地缓存 + Redis 分布式缓存
- 请求合并 :对小请求进行批量处理
- 熔断降级 :防止雪崩效应
- 监控告警 :实时监控性能指标
- 超时控制 :设置合理的超时时间
- 重试机制 :对可重试错误进行自动重试
这些优化策略可以根据实际业务场景和 AI 服务特性进行调整组合。
3. 向量检索性能优化:索引优化、参数调优
Spring AI 的向量检索性能优化是一个系统工程,主要涉及索引优化 和参数调优 两大方向。下面我为你详细梳理优化策略和具体实践。
一、核心优化方向
1. 索引优化
这是性能提升的根本,直接影响查询速度和精度。
索引类型选择
// 以 Pinecone 为例,索引类型选择
public enum IndexType {
// 精确检索,速度慢,精度100%
FLAT,
// 近似检索,速度快,精度高(推荐大多数场景)
IVF,
// 基于图的索引,超高维效果更好
HNSW
}
// Spring AI 配置示例
@Configuration
public class VectorStoreConfig {
@Bean
public VectorStore vectorStore(EmbeddingModel embeddingModel) {
// 根据数据规模选择索引
int dimension = 1536; // OpenAI 维度
long dataSize = 1000000; // 数据量
PineconeVectorStoreConfig config = PineconeVectorStoreConfig.builder()
.indexType(dataSize > 500000 ? IndexType.HNSW : IndexType.IVF)
.metric(MetricType.COSINE) // 相似度计算方式
.build();
return new PineconeVectorStore(config, embeddingModel);
}
}
索引构建优化
# application.yml 中的索引配置
spring:
ai:
vectorstore:
pinecone:
# 索引构建参数
index-config:
pods: 2 # 根据数据量调整
replicas: 2 # 副本数,提高可用性
pod-type: "p2.xlarge" # 根据需求选择规格
2. 参数调优
查询参数优化
@Service
public class RetrievalService {
@Autowired
private VectorStore vectorStore;
public List<Document> optimizedSearch(String query) {
// 关键参数调优
SimilaritySearchRequest request = SimilaritySearchRequest.builder()
.query(query)
.topK(20) // 返回结果数,平衡精度和速度
.similarityThreshold(0.7) // 相似度阈值,过滤低质量结果
.filterExpression("category == 'tech'") // 元数据过滤,减少搜索空间
.build();
return vectorStore.similaritySearch(request);
}
}
批处理优化
@Component
public class BatchOptimizer {
public void batchInsert(List<Document> documents) {
// 批量插入,减少网络开销
int batchSize = 100; // 根据向量库调整
List<List<Document>> batches = partition(documents, batchSize);
batches.forEach(batch -> {
vectorStore.add(batch);
// 添加延迟避免限流
Thread.sleep(50);
});
}
}
二、分层优化策略
基础层优化
public class BasicOptimizations {
// 1. 维度优化
public void dimensionOptimization() {
// 使用降维技术(如PCA)减少维度
// 适合维度 > 2048 的场景
}
// 2. 向量归一化
public void normalizeVectors() {
// 确保所有向量单位化,提升余弦相似度计算效率
// 在存入向量库前预处理
}
// 3. 数据分片
public void dataSharding() {
// 根据业务逻辑分片存储
// 例如:按时间、按类别分片
}
}
查询层优化
public class QueryOptimizations {
// 1. 多阶段检索
public List<Document> multiStageSearch(String query) {
// 第一阶段:粗筛
List<Document> candidates = vectorStore.similaritySearch(
SearchRequest.builder()
.query(query)
.topK(100) // 放宽限制
.build()
);
// 第二阶段:精筛(业务逻辑过滤)
return candidates.stream()
.filter(doc -> meetsBusinessRules(doc))
.limit(10)
.collect(Collectors.toList());
}
// 2. 缓存策略
@Cacheable(value = "vectorCache", key = "#query.hashCode()")
public List<Document> cachedSearch(String query) {
return vectorStore.similaritySearch(query);
}
}
三、具体场景优化方案
场景1:大规模数据集(>100万条)
# 配置方案
optimization:
large-scale:
index-type: HNSW
hnsw-parameters:
m: 32 # 层间连接数,越大精度越高
ef-construction: 200 # 构建时的邻居数
ef-search: 100 # 搜索时的邻居数
partitioning:
enabled: true
shard-key: "monthly" # 按月分片
场景2:低延迟要求(<50ms)
@Configuration
@Profile("low-latency")
public class LowLatencyConfig {
@Bean
public VectorStore vectorStore() {
return PineconeVectorStore.builder()
.indexType(IndexType.HNSW)
.metric(MetricType.DOT_PRODUCT) // 点积比余弦更快
.parameters(Map.of(
"ef_search", 64, // 减少搜索范围
"parallel_queries", 2 // 并行查询
))
.build();
}
@Bean
public CacheManager cacheManager() {
// 使用Caffeine实现本地缓存
return new CaffeineCacheManager("vectorCache");
}
}
场景3:高精度要求(>95%召回率)
public class HighPrecisionStrategy {
public SearchRequest buildPrecisionRequest(String query) {
return SearchRequest.builder()
.query(query)
.topK(50) // 增加返回数量
.similarityThreshold(0.6) // 降低阈值
.includeMetadata(true) // 包含完整元数据
.includeValues(false) // 不返回向量值,减少数据传输
.build();
}
// 使用混合检索
public List<Document> hybridSearch(String query) {
// 向量检索 + 关键词检索
List<Document> vectorResults = vectorSearch(query);
List<Document> keywordResults = keywordSearch(query);
return rerankAndMerge(vectorResults, keywordResults);
}
}
四、监控与调优工具
性能监控
@Aspect
@Component
@Slf4j
public class VectorStoreMonitor {
@Around("execution(* org.springframework.ai.vectorstore.*.similaritySearch(..))")
public Object monitorSearch(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
Object result = joinPoint.proceed();
long duration = System.currentTimeMillis() - start;
log.info("Vector search took {} ms, results: {}",
duration, ((List<?>)result).size());
if (duration > 1000) { // 慢查询预警
log.warn("Slow vector search detected!");
}
return result;
}
}
调优参数表
表格 还在加载中,请等待加载完成后再尝试复制
五、最佳实践总结
- 数据预处理是关键
- 清洗、去重、标准化文本
- 选择合适的嵌入模型
- 索引选择策略
- 小数据量(<10万):FLAT
- 中等数据量(10万-100万):IVF
- 大数据量(>100万):HNSW
- 查询优化
- 使用过滤条件缩小搜索范围
- 实现多级缓存
- 考虑异步查询
- 资源规划
- 根据QPS选择向量库规格
- 预留20-30%的性能余量
- 持续监控
- 监控P95/P99延迟
- 设置慢查询告警
- 定期重新评估索引策略
六、故障排查清单
public class TroubleshootingGuide {
public void diagnosePerformanceIssues() {
// 1. 检查网络延迟
// 2. 验证向量维度一致性
// 3. 检查索引是否已构建完成
// 4. 监控内存和CPU使用率
// 5. 查看向量库的慢查询日志
}
public void quickFixes() {
// 临时解决方案:
// 1. 降低topK值
// 2. 增加相似度阈值
// 3. 启用查询缓存
// 4. 减少并发请求数
}
}
通过以上优化策略的组合使用,通常可以将向量检索性能提升30%-300%。建议先从索引优化开始,然后进行参数调优,最后考虑架构层面的优化。
4. 高并发场景适配:限流、削峰、批量处理
Spring AI 在处理高并发场景时,需要结合 Spring 生态和分布式系统设计原则。以下是针对限流、削峰和批量处理的适配方案:
一、限流策略
1. API 级限流
// 使用 Resilience4j 实现限流
@Configuration
public class RateLimitConfig {
@Bean
public RateLimiter aiRateLimiter() {
return RateLimiter.of("ai-api",
RateLimiterConfig.custom()
.limitForPeriod(100) // 每秒100个请求
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofMillis(100))
.build()
);
}
@Bean
@Qualifier("aiChatClient")
public ChatClient rateLimitedChatClient(ChatClient chatClient) {
return RateLimiter.decorateSupplier(aiRateLimiter(),
() -> chatClient).get();
}
}
2. 用户级限流
@Service
public class UserRateLimitService {
@Cacheable(value = "userRateLimit", key = "#userId")
public boolean allowRequest(String userId) {
// 使用 Redis + Lua 实现分布式限流
String luaScript = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('GET', key)
if current and tonumber(current) >= limit then
return 0
end
redis.call('INCR', key)
if tonumber(current) == 0 then
redis.call('EXPIRE', key, window)
end
return 1
""";
// 执行 Lua 脚本
return true; // 简化实现
}
}
3. 基于令牌桶的全局限流
@Component
public class TokenBucketLimiter {
private final RateLimiter rateLimiter;
private final ScheduledExecutorService scheduler;
public TokenBucketLimiter(int capacity, int refillRate) {
this.rateLimiter = new SmoothRateLimiter.SmoothBursty(
SmoothRateLimiter.SmoothBursty.create(capacity, refillRate)
);
this.scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(this::refillTokens,
0, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return rateLimiter.tryAcquire();
}
}
二、削峰填谷策略
1. 消息队列缓冲
@Configuration
public class QueueConfig {
@Bean
public Queue aiRequestQueue() {
return QueueBuilder.durable("ai.requests")
.withArgument("x-max-length", 10000)
.withArgument("x-overflow", "reject-publish")
.build();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
2. 异步处理服务
@Service
public class AsyncAIService {
@Async("aiTaskExecutor")
@RabbitListener(queues = "ai.requests")
public CompletableFuture<AiResponse> processRequest(AiRequest request) {
return CompletableFuture.supplyAsync(() -> {
// 处理 AI 请求
return aiClient.call(request);
});
}
@Bean("aiTaskExecutor")
public ThreadPoolTaskExecutor aiTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("ai-worker-");
return executor;
}
}
3. 请求合并窗口
@Component
public class RequestBuffer {
private final LinkedBlockingQueue<BufferedRequest> buffer =
new LinkedBlockingQueue<>(1000);
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
@PostConstruct
public void init() {
scheduler.scheduleAtFixedRate(this::flushBuffer,
100, 100, TimeUnit.MILLISECONDS); // 100ms 窗口
}
public void addRequest(String userId, String prompt) {
buffer.offer(new BufferedRequest(userId, prompt, System.currentTimeMillis()));
}
private void flushBuffer() {
List<BufferedRequest> batch = new ArrayList<>();
buffer.drainTo(batch, 50); // 最多批量50个
if (!batch.isEmpty()) {
processBatch(batch);
}
}
}
三、批量处理优化
1. 批量 API 调用
@Service
public class BatchAIService {
private final BatchChatClient batchChatClient;
private final ExecutorService batchExecutor;
public BatchAIService() {
this.batchExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2
);
}
public List<AiResponse> batchProcess(List<AiRequest> requests) {
// 分批处理,每批最多100个
return Lists.partition(requests, 100).stream()
.parallel()
.map(this::processBatch)
.flatMap(List::stream)
.collect(Collectors.toList());
}
private List<AiResponse> processBatch(List<AiRequest> batch) {
List<Prompt> prompts = batch.stream()
.map(req -> new Prompt(req.getContent()))
.collect(Collectors.toList());
return batchChatClient.generate(prompts)
.stream()
.map(Generation::getText)
.map(AiResponse::new)
.collect(Collectors.toList());
}
}
2. 连接池配置
# application.yml
spring:
ai:
openai:
chat:
options:
model: gpt-4
client:
connection-timeout: 10s
read-timeout: 30s
max-connections: 200
max-connections-per-route: 50
keep-alive: 30s
3. 响应式批量处理
@RestController
public class ReactiveAIController {
@PostMapping("/ai/batch")
public Flux<AiResponse> batchProcess(@RequestBody Flux<AiRequest> requests) {
return requests
.bufferTimeout(50, Duration.ofMillis(100)) // 50个或100ms窗口
.flatMap(batch -> Mono.fromCallable(() -> processBatch(batch))
.subscribeOn(Schedulers.boundedElastic()))
.flatMapIterable(list -> list);
}
}
四、综合配置方案
1. 配置类整合
@Configuration
@EnableAsync
@EnableCaching
@EnableCircuitBreaker
public class AIConfig {
@Bean
public ChatClient chatClient(
OpenAiChatOptions options,
@Value("${ai.max-concurrent:100}") int maxConcurrent) {
return OpenAiChatClient.builder()
.openAiApi(openAiApi())
.options(options)
.retryTemplate(retryTemplate())
.build();
}
@Bean
public RetryTemplate retryTemplate() {
return RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2, 10000)
.retryOn(ResourceAccessException.class)
.build();
}
@Bean
public CircuitBreakerFactory circuitBreakerFactory() {
return new Resilience4JCircuitBreakerFactory();
}
}
2. 监控与熔断
@Component
public class AIMonitor {
private final MeterRegistry meterRegistry;
private final Counter requestCounter;
private final Timer responseTimer;
public AIMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.requestCounter = Counter.builder("ai.requests")
.description("AI API requests")
.register(meterRegistry);
this.responseTimer = Timer.builder("ai.response.time")
.register(meterRegistry);
}
@Around("@annotation(AiCall)")
public Object monitor(ProceedingJoinPoint pjp) throws Throwable {
requestCounter.increment();
return responseTimer.record(() -> {
try {
return pjp.proceed();
} catch (Throwable e) {
meterRegistry.counter("ai.errors").increment();
throw e;
}
});
}
}
五、最佳实践建议
- 分级限流策略 :
- 全局 API 限流
- 用户级别限流
- 基于业务优先级限流
- 队列选择原则 :
- 实时性要求高:使用内存队列
- 数据持久化要求:使用 Kafka/RabbitMQ
- 海量数据:使用 Redis Stream
- 批量处理优化 :
- 根据 AI 服务商的最大批量限制设置
- 动态调整批量大小
- 失败重试和部分成功处理
- 监控指标 :
// 关键监控指标
- QPS/TPS
- 平均响应时间
- 错误率
- 队列积压长度
- 系统资源使用率
- 降级策略 :
@Service
public class FallbackService {
@CircuitBreaker(name = "aiService", fallbackMethod = "fallback")
public AiResponse callAI(String prompt) {
return aiClient.generate(prompt);
}
public AiResponse fallback(String prompt, Throwable t) {
// 返回缓存结果或简化版本
return new AiResponse("系统繁忙,请稍后重试");
}
}
这些方案可以根据实际业务需求进行组合使用,建议先从小规模开始,逐步优化调整参数。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)