1. Token 优化:上下文裁剪、精简Prompt、切片优化

Spring AI 中 Token 优化是控制成本、提升性能的关键。以下是针对上下文裁剪、Prompt 精简和切片优化的系统化方案:

一、上下文裁剪策略

1. 自动上下文窗口管理
@Component
public class ContextWindowManager {
    private final int MAX_TOKENS = 8000; // 根据模型调整
    
    public String trimContext(String context, int targetTokens) {
        // 基于Token计数器的裁剪
        TokenCountEstimator estimator = new TokenCountEstimator();
        int currentTokens = estimator.estimate(context);
        
        if (currentTokens <= targetTokens) {
            return context;
        }
        
        // 智能裁剪:保留关键部分
        return intelligentTrimming(context, targetTokens);
    }
    
    private String intelligentTrimming(String text, int targetTokens) {
        // 1. 提取摘要
        String summary = extractSummary(text, targetTokens/4);
        
        // 2. 保留开头和结尾(通常包含重要信息)
        String[] sentences = text.split("[.!?]");
        int keepEach = targetTokens / (sentences.length * 2);
        
        // 3. 使用滑动窗口保留核心内容
        return slidingWindowTrim(text, targetTokens);
    }
}
2. 基于重要性的内容过滤
public class ContentPrioritizer {
    // 使用TF-IDF或嵌入相似度评估重要性
    public List<String> prioritizeSections(String document, 
                                          String query, 
                                          int maxSections) {
        List<TextSection> sections = splitIntoSections(document);
        
        // 计算每个部分与查询的相关性
        sections.forEach(section -> {
            section.setRelevanceScore(
                calculateRelevance(section.getContent(), query)
            );
        });
        
        // 按相关性排序并选择Top-K
        return sections.stream()
            .sorted(Comparator.reverseOrder())
            .limit(maxSections)
            .map(TextSection::getContent)
            .collect(Collectors.toList());
    }
}

二、Prompt 精简优化

1. 结构化Prompt模板
@Component
public class OptimizedPromptTemplate {
    
    public String createConcisePrompt(String userQuery, 
                                     String context, 
                                     Map<String, String> variables) {
        return """
            # 指令(简洁)
            %s
                        
            # 上下文(摘要)
            %s
                        
            # 要求
            1. 直接回答核心问题
            2. 避免重复上下文
            3. 使用要点格式
                        
            # 变量
            %s
            """.formatted(
                truncateToTokens(userQuery, 200),
                summarizeContext(context, 500),
                formatVariables(variables)
            );
    }
    
    private String summarizeContext(String context, int tokenLimit) {
        // 提取关键句子
        return KeySentenceExtractor.extract(context, tokenLimit);
    }
}
2. 动态Prompt构建
public class DynamicPromptBuilder {
    
    public Prompt buildAdaptivePrompt(ChatRequest request) {
        PromptTemplate template;
        
        switch (request.getComplexity()) {
            case SIMPLE:
                template = new SimpleQATemplate(); // 50-100 tokens
                break;
            case COMPLEX:
                template = new AnalyticalTemplate(); // 200-300 tokens
                break;
            default:
                template = new BalancedTemplate();
        }
        
        // 根据Token预算调整
        return adjustForTokenBudget(template, request.getTokenBudget());
    }
}

三、切片优化策略

1. 智能文档分块
@Configuration
public class ChunkingConfig {
    
    @Bean
    public TextSplitter semanticSplitter() {
        return new TokenTextSplitter.Builder()
            .setChunkSize(1000)  // Token数
            .setChunkOverlap(100) // 重叠Token
            .setSeparator("\n\n") // 按段落分割
            .setKeepSeparator(true)
            .build();
    }
    
    @Bean
    public TextSplitter hierarchicalSplitter() {
        // 层次化分割:章节→段落→句子
        return new RecursiveCharacterTextSplitter(
            2000,  // 大块
            200,   // 重叠
            List.of("\n## ", "\n\n", "\n", " "), // 分割符优先级
            true
        );
    }
}
2. 语义感知切片
@Service
public class SemanticChunkingService {
    
    public List<DocumentChunk> chunkWithSemanticBoundaries(String text) {
        List<DocumentChunk> chunks = new ArrayList<>();
        
        // 1. 检测主题边界
        List<Integer> boundaries = detectTopicBoundaries(text);
        
        // 2. 在主题边界内进行Token级别的分块
        for (int i = 0; i < boundaries.size() - 1; i++) {
            String segment = text.substring(boundaries.get(i), boundaries.get(i + 1));
            
            if (estimateTokens(segment) > MAX_CHUNK_TOKENS) {
                // 递归分割
                chunks.addAll(splitBySentences(segment));
            } else {
                chunks.add(new DocumentChunk(segment));
            }
        }
        
        return chunks;
    }
    
    private List<Integer> detectTopicBoundaries(String text) {
        // 使用以下策略检测边界:
        // - 标题(#、##)
        // - 段落长度突变
        // - 连接词(然而、另外、总之)
        // - 嵌入相似度变化
        return topicDetectionAlgorithm(text);
    }
}

四、Token使用监控与优化

1. Token消耗监控器
@Component
@Slf4j
public class TokenUsageMonitor {
    
    @Autowired
    private TokenCountEstimator tokenEstimator;
    
    private final Map<String, TokenStats> usageStats = new ConcurrentHashMap<>();
    
    @EventListener
    public void handleChatEvent(ChatCompletionEvent event) {
        int promptTokens = tokenEstimator.estimate(event.getPrompt());
        int completionTokens = tokenEstimator.estimate(event.getResponse());
        
        TokenStats stats = new TokenStats(
            promptTokens,
            completionTokens,
            LocalDateTime.now()
        );
        
        usageStats.put(event.getSessionId(), stats);
        
        // 预警机制
        if (promptTokens > WARNING_THRESHOLD) {
            log.warn("High token usage detected: {}", promptTokens);
            suggestOptimizations(event.getPrompt());
        }
    }
    
    public void suggestOptimizations(String prompt) {
        List<String> suggestions = new ArrayList<>();
        
        if (hasRedundantText(prompt)) {
            suggestions.add("移除重复内容");
        }
        
        if (hasLongExamples(prompt)) {
            suggestions.add("缩短示例或使用摘要");
        }
        
        if (hasUnnecessaryFormatting(prompt)) {
            suggestions.add("简化格式标记");
        }
        
        // 提供优化后的Prompt版本
        String optimized = applyOptimizations(prompt, suggestions);
    }
}
2. 自适应Token预算分配
@Service
public class AdaptiveTokenBudget {
    
    public TokenBudget allocateBudget(ChatRequest request) {
        int totalBudget = getModelLimit(request.getModel());
        
        // 动态分配策略
        int contextBudget = (int) (totalBudget * 0.6);  // 60%给上下文
        int promptBudget = (int) (totalBudget * 0.2);   // 20%给Prompt
        int responseBudget = totalBudget - contextBudget - promptBudget;
        
        // 根据查询复杂度调整
        if (isComplexQuery(request.getQuery())) {
            contextBudget *= 0.8;  // 减少上下文,增加Prompt预算
            promptBudget *= 1.2;
        }
        
        return new TokenBudget(contextBudget, promptBudget, responseBudget);
    }
}

五、实用工具类

1. Token估算器
@Component
public class TokenCountEstimator {
    
    // 近似估算(比调用API快)
    public int estimateTokens(String text) {
        // 简单估算:英文~4字符/Token,中文~2字符/Token
        int chineseChars = countChineseCharacters(text);
        int otherChars = text.length() - chineseChars;
        
        return (int) (chineseChars / 2.0 + otherChars / 4.0);
    }
    
    // 精确计算(需要调用模型API)
    public int calculateExactTokens(String text, String model) {
        // 调用模型的Token计数API
        return aiClient.countTokens(text, model);
    }
}
2. Prompt压缩器
public class PromptCompressor {
    
    public String compressPrompt(String originalPrompt) {
        // 1. 移除多余空格和换行
        String compressed = originalPrompt
            .replaceAll("\\s+", " ")
            .replaceAll("\n{3,}", "\n\n");
        
        // 2. 缩写常见短语
        compressed = abbreviateCommonPhrases(compressed);
        
        // 3. 使用占位符替换重复内容
        compressed = replaceDuplicatesWithReferences(compressed);
        
        return compressed;
    }
    
    private String abbreviateCommonPhrases(String text) {
        Map<String, String> abbreviations = Map.of(
            "for example", "e.g.",
            "that is", "i.e.",
            "please", "pls",
            "important", "imp"
        );
        
        for (Map.Entry<String, String> entry : abbreviations.entrySet()) {
            text = text.replace(entry.getKey(), entry.getValue());
        }
        
        return text;
    }
}

六、配置建议

application.yml 配置示例:
spring:
  ai:
    token:
      optimization:
        enabled: true
        max-context-tokens: 4000
        max-prompt-tokens: 1000
        chunk:
          size: 1000
          overlap: 100
          strategy: semantic
        compression:
          enabled: true
          min-savings-percent: 20
    monitoring:
      token-usage:
        enabled: true
        warning-threshold: 8000
        alert-threshold: 12000

最佳实践总结

  1. 分层裁剪策略 :
  1. 第一层:移除无关内容
  2. 第二层:提取关键信息
  3. 第三层:智能摘要
  1. Prompt设计原则 :
  1. 指令先行,上下文后置
  2. 使用结构化模板
  3. 避免冗余描述
  1. 切片优化 :
  1. 按语义边界分块
  2. 保持上下文连贯性
  3. 动态调整块大小
  1. 监控与调优 :
  1. 实时监控Token消耗
  2. A/B测试不同策略
  3. 根据业务需求调整参数

这些策略可以显著减少Token使用,同时保持模型性能。建议根据具体应用场景调整参数,并进行持续的监控和优化。

2. 接口性能优化:异步、线程池、缓存策略

Spring AI 应用性能优化可以从异步处理、线程池配置和缓存策略三个方面入手。以下是详细的优化方案:

一、异步处理优化

异步调用 AI 接口
@Service
public class AIService {
    
    @Async("aiTaskExecutor")
    public CompletableFuture<String> generateContentAsync(String prompt) {
        String result = chatClient.call(prompt);
        return CompletableFuture.completedFuture(result);
    }
    
    // 批量异步处理
    @Async("aiTaskExecutor")
    public CompletableFuture<List<String>> batchProcessAsync(List<String> prompts) {
        List<CompletableFuture<String>> futures = prompts.stream()
            .map(prompt -> CompletableFuture.supplyAsync(() -> 
                chatClient.call(prompt), aiTaskExecutor))
            .collect(Collectors.toList());
        
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
    }
}
WebFlux 响应式支持
@RestController
public class AIController {
    
    @GetMapping("/ai/stream")
    public Flux<String> streamResponse(@RequestParam String prompt) {
        return chatClient.stream(prompt)
            .map(Generation::getText)
            .delayElements(Duration.ofMillis(50)); // 控制流式输出速度
    }
    
    @PostMapping("/ai/batch")
    public Mono<ResponseEntity<BatchResult>> batchProcess(
            @RequestBody BatchRequest request) {
        return Flux.fromIterable(request.getPrompts())
            .flatMap(prompt -> 
                Mono.fromCallable(() -> chatClient.call(prompt))
                    .subscribeOn(Schedulers.boundedElastic())
            )
            .collectList()
            .map(results -> ResponseEntity.ok(new BatchResult(results)));
    }
}

二、线程池优化配置

AI 专用线程池配置
@Configuration
@EnableAsync
public class ThreadPoolConfig {
    
    @Bean("aiTaskExecutor")
    public ThreadPoolTaskExecutor aiTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数 = CPU核心数 * 2
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
        // 最大线程数根据 AI 接口响应时间调整
        executor.setMaxPoolSize(50);
        // 队列容量避免过大导致内存溢出
        executor.setQueueCapacity(100);
        // 线程空闲时间
        executor.setKeepAliveSeconds(60);
        // 线程名前缀
        executor.setThreadNamePrefix("ai-executor-");
        // 拒绝策略:调用者运行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务完成后关闭
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
    
    @Bean("ioTaskExecutor")
    public ThreadPoolTaskExecutor ioTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // I/O密集型任务可以设置更多线程
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("io-executor-");
        executor.initialize();
        return executor;
    }
}
虚拟线程支持(Java 21+)
@Configuration
public class VirtualThreadConfig {
    
    @Bean
    public TaskExecutor virtualThreadExecutor() {
        return TaskExecutorAdapter.ofVirtualThreadExecutor("ai-virtual-");
    }
    
    @Bean
    @Qualifier("virtualThreadTaskExecutor")
    public AsyncTaskExecutor virtualThreadTaskExecutor() {
        return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
    }
}

三、缓存策略优化

多级缓存配置
@Configuration
@EnableCaching
public class CacheConfig {
    
    // 本地缓存(Caffeine)
    @Bean
    public CacheManager cacheManager() {
        CaffeineCacheManager cacheManager = new CaffeineCacheManager();
        cacheManager.setCaffeine(Caffeine.newBuilder()
            .maximumSize(1000) // 最大缓存条目数
            .expireAfterWrite(10, TimeUnit.MINUTES) // 写入后过期时间
            .expireAfterAccess(5, TimeUnit.MINUTES) // 访问后过期时间
            .recordStats() // 记录缓存统计
        );
        return cacheManager;
    }
    
    // Redis 分布式缓存
    @Bean
    public RedisCacheManager redisCacheManager(RedisConnectionFactory factory) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
            .entryTtl(Duration.ofMinutes(30))
            .serializeKeysWith(RedisSerializationContext.SerializationPair
                .fromSerializer(new StringRedisSerializer()))
            .serializeValuesWith(RedisSerializationContext.SerializationPair
                .fromSerializer(new GenericJackson2JsonRedisSerializer()))
            .disableCachingNullValues();
        
        return RedisCacheManager.builder(factory)
            .cacheDefaults(config)
            .withCacheConfiguration("ai_responses", 
                RedisCacheConfiguration.defaultCacheConfig()
                    .entryTtl(Duration.ofHours(1)))
            .transactionAware()
            .build();
    }
}
AI 响应缓存服务
@Service
public class AICacheService {
    
    @Cacheable(value = "ai_responses", 
               key = "#prompt.hashCode()",
               unless = "#result == null")
    public String getCachedResponse(String prompt, String model) {
        // 实际调用 AI 接口
        return chatClient.call(prompt);
    }
    
    // 带参数的缓存
    @Cacheable(value = "ai_responses", 
               key = "T(com.example.util.CacheKeyGenerator)"
                     + ".generate(#prompt, #temperature, #maxTokens)")
    public String getCachedResponseWithParams(String prompt, 
                                             double temperature, 
                                             int maxTokens) {
        // 构建带参数的请求
        return chatClient.call(prompt);
    }
    
    // 批量缓存
    @Cacheable(value = "ai_batch_responses", 
               key = "#prompts.hashCode()")
    public List<String> batchGetCachedResponses(List<String> prompts) {
        return prompts.stream()
            .map(this::getCachedResponse)
            .collect(Collectors.toList());
    }
}
向量相似度缓存
@Service
public class VectorCacheService {
    
    private final Map<String, List<Double>> vectorCache = 
        new ConcurrentHashMap<>();
    
    @Cacheable(value = "embeddings", 
               key = "#text.hashCode()")
    public List<Double> getCachedEmbedding(String text) {
        // 计算或获取向量
        return embeddingClient.embed(text);
    }
    
    // 相似度结果缓存
    @Cacheable(value = "similarity_scores", 
               key = "#query.hashCode() + '_' + #document.hashCode()")
    public double getCachedSimilarity(String query, String document) {
        List<Double> queryVector = getCachedEmbedding(query);
        List<Double> docVector = getCachedEmbedding(document);
        return calculateCosineSimilarity(queryVector, docVector);
    }
}

四、综合优化策略

请求合并与批处理
@Component
public class AIRequestBatcher {
    
    private final BatchProcessor batchProcessor;
    private final LinkedBlockingQueue<Request> requestQueue = 
        new LinkedBlockingQueue<>(1000);
    
    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(this::processBatch, 100, 100, TimeUnit.MILLISECONDS);
    }
    
    private void processBatch() {
        List<Request> batch = new ArrayList<>();
        requestQueue.drainTo(batch, 50); // 每次最多处理50个
        
        if (!batch.isEmpty()) {
            List<String> prompts = batch.stream()
                .map(Request::getPrompt)
                .collect(Collectors.toList());
            
            List<String> responses = batchProcessor.processBatch(prompts);
            
            // 分发结果
            for (int i = 0; i < batch.size(); i++) {
                batch.get(i).getFuture().complete(responses.get(i));
            }
        }
    }
    
    public CompletableFuture<String> submitRequest(String prompt) {
        CompletableFuture<String> future = new CompletableFuture<>();
        requestQueue.offer(new Request(prompt, future));
        return future;
    }
    
    @Data
    @AllArgsConstructor
    private static class Request {
        private String prompt;
        private CompletableFuture<String> future;
    }
}
限流与熔断
@Configuration
public class ResilienceConfig {
    
    @Bean
    public CircuitBreaker aiCircuitBreaker() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50) // 失败率阈值
            .waitDurationInOpenState(Duration.ofSeconds(30)) // 熔断时间
            .slidingWindowSize(10) // 滑动窗口大小
            .permittedNumberOfCallsInHalfOpenState(5) // 半开状态允许的调用数
            .build();
        
        return CircuitBreaker.of("aiService", config);
    }
    
    @Bean
    public RateLimiter aiRateLimiter() {
        RateLimiterConfig config = RateLimiterConfig.custom()
            .limitForPeriod(10) // 每秒限制
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .timeoutDuration(Duration.ofMillis(500)) // 等待超时
            .build();
        
        return RateLimiter.of("aiService", config);
    }
    
    @Bean
    public Bulkhead aiBulkhead() {
        BulkheadConfig config = BulkheadConfig.custom()
            .maxConcurrentCalls(20) // 最大并发数
            .maxWaitDuration(Duration.ofMillis(100))
            .build();
        
        return Bulkhead.of("aiService", config);
    }
}
监控与指标
@Component
public class AIMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    private final Timer aiCallTimer;
    
    public AIMetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.aiCallTimer = Timer.builder("ai.api.call.duration")
            .description("AI API调用耗时")
            .register(meterRegistry);
    }
    
    public String callWithMetrics(String prompt) {
        return aiCallTimer.record(() -> {
            // 记录并发数
            meterRegistry.gauge("ai.api.concurrent.calls", 
                new AtomicInteger(1), 
                AtomicInteger::getAndIncrement);
            
            try {
                String result = chatClient.call(prompt);
                // 记录成功
                meterRegistry.counter("ai.api.call.success").increment();
                return result;
            } catch (Exception e) {
                // 记录失败
                meterRegistry.counter("ai.api.call.failure").increment();
                throw e;
            } finally {
                meterRegistry.gauge("ai.api.concurrent.calls",
                    new AtomicInteger(1),
                    AtomicInteger::getAndDecrement);
            }
        });
    }
}

五、配置建议

application.yml 配置示例
spring:
  ai:
    openai:
      api-key: ${OPENAI_API_KEY}
      chat:
        options:
          model: gpt-4
          temperature: 0.7
      connection-timeout: 30s
      read-timeout: 60s
  
  cache:
    type: caffeine
    caffeine:
      spec: maximumSize=1000,expireAfterWrite=10m
    redis:
      time-to-live: 1h
  
  task:
    execution:
      pool:
        core-size: 8
        max-size: 50
        queue-capacity: 100
        keep-alive: 60s

resilience4j:
  circuitbreaker:
    instances:
      aiService:
        failure-rate-threshold: 50
        wait-duration-in-open-state: 30s
  ratelimiter:
    instances:
      aiService:
        limit-for-period: 10
        limit-refresh-period: 1s

六、最佳实践总结

  1. 异步优先 :所有 AI 调用都应使用异步方式
  2. 线程池隔离 :AI 调用使用独立线程池,避免影响核心业务
  3. 多级缓存 :本地缓存 + Redis 分布式缓存
  4. 请求合并 :对小请求进行批量处理
  5. 熔断降级 :防止雪崩效应
  6. 监控告警 :实时监控性能指标
  7. 超时控制 :设置合理的超时时间
  8. 重试机制 :对可重试错误进行自动重试

这些优化策略可以根据实际业务场景和 AI 服务特性进行调整组合。

3. 向量检索性能优化:索引优化、参数调优

Spring AI 的向量检索性能优化是一个系统工程,主要涉及索引优化 和参数调优 两大方向。下面我为你详细梳理优化策略和具体实践。

一、核心优化方向

1. 索引优化

这是性能提升的根本,直接影响查询速度和精度。

索引类型选择
// 以 Pinecone 为例,索引类型选择
public enum IndexType {
    // 精确检索,速度慢,精度100%
    FLAT,  
    // 近似检索,速度快,精度高(推荐大多数场景)
    IVF,    
    // 基于图的索引,超高维效果更好
    HNSW    
}

// Spring AI 配置示例
@Configuration
public class VectorStoreConfig {
    
    @Bean
    public VectorStore vectorStore(EmbeddingModel embeddingModel) {
        // 根据数据规模选择索引
        int dimension = 1536; // OpenAI 维度
        long dataSize = 1000000; // 数据量
        
        PineconeVectorStoreConfig config = PineconeVectorStoreConfig.builder()
            .indexType(dataSize > 500000 ? IndexType.HNSW : IndexType.IVF)
            .metric(MetricType.COSINE)  // 相似度计算方式
            .build();
            
        return new PineconeVectorStore(config, embeddingModel);
    }
}
索引构建优化
# application.yml 中的索引配置
spring:
  ai:
    vectorstore:
      pinecone:
        # 索引构建参数
        index-config:
          pods: 2                    # 根据数据量调整
          replicas: 2                # 副本数,提高可用性
          pod-type: "p2.xlarge"      # 根据需求选择规格
2. 参数调优
查询参数优化
@Service
public class RetrievalService {
    
    @Autowired
    private VectorStore vectorStore;
    
    public List<Document> optimizedSearch(String query) {
        // 关键参数调优
        SimilaritySearchRequest request = SimilaritySearchRequest.builder()
            .query(query)
            .topK(20)                     // 返回结果数,平衡精度和速度
            .similarityThreshold(0.7)     // 相似度阈值,过滤低质量结果
            .filterExpression("category == 'tech'") // 元数据过滤,减少搜索空间
            .build();
            
        return vectorStore.similaritySearch(request);
    }
}
批处理优化
@Component
public class BatchOptimizer {
    
    public void batchInsert(List<Document> documents) {
        // 批量插入,减少网络开销
        int batchSize = 100;  // 根据向量库调整
        List<List<Document>> batches = partition(documents, batchSize);
        
        batches.forEach(batch -> {
            vectorStore.add(batch);
            // 添加延迟避免限流
            Thread.sleep(50);
        });
    }
}

二、分层优化策略

基础层优化
public class BasicOptimizations {
    
    // 1. 维度优化
    public void dimensionOptimization() {
        // 使用降维技术(如PCA)减少维度
        // 适合维度 > 2048 的场景
    }
    
    // 2. 向量归一化
    public void normalizeVectors() {
        // 确保所有向量单位化,提升余弦相似度计算效率
        // 在存入向量库前预处理
    }
    
    // 3. 数据分片
    public void dataSharding() {
        // 根据业务逻辑分片存储
        // 例如:按时间、按类别分片
    }
}
查询层优化
public class QueryOptimizations {
    
    // 1. 多阶段检索
    public List<Document> multiStageSearch(String query) {
        // 第一阶段:粗筛
        List<Document> candidates = vectorStore.similaritySearch(
            SearchRequest.builder()
                .query(query)
                .topK(100)      // 放宽限制
                .build()
        );
        
        // 第二阶段:精筛(业务逻辑过滤)
        return candidates.stream()
            .filter(doc -> meetsBusinessRules(doc))
            .limit(10)
            .collect(Collectors.toList());
    }
    
    // 2. 缓存策略
    @Cacheable(value = "vectorCache", key = "#query.hashCode()")
    public List<Document> cachedSearch(String query) {
        return vectorStore.similaritySearch(query);
    }
}

三、具体场景优化方案

场景1:大规模数据集(>100万条)
# 配置方案
optimization:
  large-scale:
    index-type: HNSW
    hnsw-parameters:
      m: 32           # 层间连接数,越大精度越高
      ef-construction: 200  # 构建时的邻居数
      ef-search: 100  # 搜索时的邻居数
    partitioning:
      enabled: true
      shard-key: "monthly"  # 按月分片
场景2:低延迟要求(<50ms)
@Configuration
@Profile("low-latency")
public class LowLatencyConfig {
    
    @Bean
    public VectorStore vectorStore() {
        return PineconeVectorStore.builder()
            .indexType(IndexType.HNSW)
            .metric(MetricType.DOT_PRODUCT)  // 点积比余弦更快
            .parameters(Map.of(
                "ef_search", 64,      // 减少搜索范围
                "parallel_queries", 2  // 并行查询
            ))
            .build();
    }
    
    @Bean
    public CacheManager cacheManager() {
        // 使用Caffeine实现本地缓存
        return new CaffeineCacheManager("vectorCache");
    }
}
场景3:高精度要求(>95%召回率)
public class HighPrecisionStrategy {
    
    public SearchRequest buildPrecisionRequest(String query) {
        return SearchRequest.builder()
            .query(query)
            .topK(50)                      // 增加返回数量
            .similarityThreshold(0.6)      // 降低阈值
            .includeMetadata(true)         // 包含完整元数据
            .includeValues(false)          // 不返回向量值,减少数据传输
            .build();
    }
    
    // 使用混合检索
    public List<Document> hybridSearch(String query) {
        // 向量检索 + 关键词检索
        List<Document> vectorResults = vectorSearch(query);
        List<Document> keywordResults = keywordSearch(query);
        
        return rerankAndMerge(vectorResults, keywordResults);
    }
}

四、监控与调优工具

性能监控
@Aspect
@Component
@Slf4j
public class VectorStoreMonitor {
    
    @Around("execution(* org.springframework.ai.vectorstore.*.similaritySearch(..))")
    public Object monitorSearch(ProceedingJoinPoint joinPoint) throws Throwable {
        long start = System.currentTimeMillis();
        Object result = joinPoint.proceed();
        long duration = System.currentTimeMillis() - start;
        
        log.info("Vector search took {} ms, results: {}", 
                 duration, ((List<?>)result).size());
        
        if (duration > 1000) {  // 慢查询预警
            log.warn("Slow vector search detected!");
        }
        
        return result;
    }
}
调优参数表

表格 还在加载中,请等待加载完成后再尝试复制

五、最佳实践总结

  1. 数据预处理是关键
  1. 清洗、去重、标准化文本
  2. 选择合适的嵌入模型
  1. 索引选择策略
  1. 小数据量(<10万):FLAT
  2. 中等数据量(10万-100万):IVF
  3. 大数据量(>100万):HNSW
  1. 查询优化
  1. 使用过滤条件缩小搜索范围
  2. 实现多级缓存
  3. 考虑异步查询
  1. 资源规划
  1. 根据QPS选择向量库规格
  2. 预留20-30%的性能余量
  1. 持续监控
  1. 监控P95/P99延迟
  2. 设置慢查询告警
  3. 定期重新评估索引策略

六、故障排查清单

public class TroubleshootingGuide {
    
    public void diagnosePerformanceIssues() {
        // 1. 检查网络延迟
        // 2. 验证向量维度一致性
        // 3. 检查索引是否已构建完成
        // 4. 监控内存和CPU使用率
        // 5. 查看向量库的慢查询日志
    }
    
    public void quickFixes() {
        // 临时解决方案:
        // 1. 降低topK值
        // 2. 增加相似度阈值
        // 3. 启用查询缓存
        // 4. 减少并发请求数
    }
}

通过以上优化策略的组合使用,通常可以将向量检索性能提升30%-300%。建议先从索引优化开始,然后进行参数调优,最后考虑架构层面的优化。

4. 高并发场景适配:限流、削峰、批量处理

Spring AI 在处理高并发场景时,需要结合 Spring 生态和分布式系统设计原则。以下是针对限流、削峰和批量处理的适配方案:

一、限流策略

1. API 级限流

// 使用 Resilience4j 实现限流
@Configuration
public class RateLimitConfig {
    
    @Bean
    public RateLimiter aiRateLimiter() {
        return RateLimiter.of("ai-api",
            RateLimiterConfig.custom()
                .limitForPeriod(100)  // 每秒100个请求
                .limitRefreshPeriod(Duration.ofSeconds(1))
                .timeoutDuration(Duration.ofMillis(100))
                .build()
        );
    }
    
    @Bean
    @Qualifier("aiChatClient")
    public ChatClient rateLimitedChatClient(ChatClient chatClient) {
        return RateLimiter.decorateSupplier(aiRateLimiter(), 
            () -> chatClient).get();
    }
}

2. 用户级限流

@Service
public class UserRateLimitService {
    
    @Cacheable(value = "userRateLimit", key = "#userId")
    public boolean allowRequest(String userId) {
        // 使用 Redis + Lua 实现分布式限流
        String luaScript = """
            local key = KEYS[1]
            local limit = tonumber(ARGV[1])
            local window = tonumber(ARGV[2])
            local current = redis.call('GET', key)
            
            if current and tonumber(current) >= limit then
                return 0
            end
            
            redis.call('INCR', key)
            if tonumber(current) == 0 then
                redis.call('EXPIRE', key, window)
            end
            return 1
            """;
        
        // 执行 Lua 脚本
        return true; // 简化实现
    }
}

3. 基于令牌桶的全局限流

@Component
public class TokenBucketLimiter {
    
    private final RateLimiter rateLimiter;
    private final ScheduledExecutorService scheduler;
    
    public TokenBucketLimiter(int capacity, int refillRate) {
        this.rateLimiter = new SmoothRateLimiter.SmoothBursty(
            SmoothRateLimiter.SmoothBursty.create(capacity, refillRate)
        );
        this.scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(this::refillTokens, 
            0, 1, TimeUnit.SECONDS);
    }
    
    public boolean tryAcquire() {
        return rateLimiter.tryAcquire();
    }
}

二、削峰填谷策略

1. 消息队列缓冲

@Configuration
public class QueueConfig {
    
    @Bean
    public Queue aiRequestQueue() {
        return QueueBuilder.durable("ai.requests")
            .withArgument("x-max-length", 10000)
            .withArgument("x-overflow", "reject-publish")
            .build();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}

2. 异步处理服务

@Service
public class AsyncAIService {
    
    @Async("aiTaskExecutor")
    @RabbitListener(queues = "ai.requests")
    public CompletableFuture<AiResponse> processRequest(AiRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            // 处理 AI 请求
            return aiClient.call(request);
        });
    }
    
    @Bean("aiTaskExecutor")
    public ThreadPoolTaskExecutor aiTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(1000);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("ai-worker-");
        return executor;
    }
}

3. 请求合并窗口

@Component
public class RequestBuffer {
    
    private final LinkedBlockingQueue<BufferedRequest> buffer = 
        new LinkedBlockingQueue<>(1000);
    private final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();
    
    @PostConstruct
    public void init() {
        scheduler.scheduleAtFixedRate(this::flushBuffer, 
            100, 100, TimeUnit.MILLISECONDS); // 100ms 窗口
    }
    
    public void addRequest(String userId, String prompt) {
        buffer.offer(new BufferedRequest(userId, prompt, System.currentTimeMillis()));
    }
    
    private void flushBuffer() {
        List<BufferedRequest> batch = new ArrayList<>();
        buffer.drainTo(batch, 50); // 最多批量50个
        
        if (!batch.isEmpty()) {
            processBatch(batch);
        }
    }
}

三、批量处理优化

1. 批量 API 调用

@Service
public class BatchAIService {
    
    private final BatchChatClient batchChatClient;
    private final ExecutorService batchExecutor;
    
    public BatchAIService() {
        this.batchExecutor = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 2
        );
    }
    
    public List<AiResponse> batchProcess(List<AiRequest> requests) {
        // 分批处理,每批最多100个
        return Lists.partition(requests, 100).stream()
            .parallel()
            .map(this::processBatch)
            .flatMap(List::stream)
            .collect(Collectors.toList());
    }
    
    private List<AiResponse> processBatch(List<AiRequest> batch) {
        List<Prompt> prompts = batch.stream()
            .map(req -> new Prompt(req.getContent()))
            .collect(Collectors.toList());
            
        return batchChatClient.generate(prompts)
            .stream()
            .map(Generation::getText)
            .map(AiResponse::new)
            .collect(Collectors.toList());
    }
}

2. 连接池配置

# application.yml
spring:
  ai:
    openai:
      chat:
        options:
          model: gpt-4
      client:
        connection-timeout: 10s
        read-timeout: 30s
        max-connections: 200
        max-connections-per-route: 50
        keep-alive: 30s

3. 响应式批量处理

@RestController
public class ReactiveAIController {
    
    @PostMapping("/ai/batch")
    public Flux<AiResponse> batchProcess(@RequestBody Flux<AiRequest> requests) {
        return requests
            .bufferTimeout(50, Duration.ofMillis(100)) // 50个或100ms窗口
            .flatMap(batch -> Mono.fromCallable(() -> processBatch(batch))
                .subscribeOn(Schedulers.boundedElastic()))
            .flatMapIterable(list -> list);
    }
}

四、综合配置方案

1. 配置类整合

@Configuration
@EnableAsync
@EnableCaching
@EnableCircuitBreaker
public class AIConfig {
    
    @Bean
    public ChatClient chatClient(
        OpenAiChatOptions options,
        @Value("${ai.max-concurrent:100}") int maxConcurrent) {
        
        return OpenAiChatClient.builder()
            .openAiApi(openAiApi())
            .options(options)
            .retryTemplate(retryTemplate())
            .build();
    }
    
    @Bean
    public RetryTemplate retryTemplate() {
        return RetryTemplate.builder()
            .maxAttempts(3)
            .exponentialBackoff(1000, 2, 10000)
            .retryOn(ResourceAccessException.class)
            .build();
    }
    
    @Bean
    public CircuitBreakerFactory circuitBreakerFactory() {
        return new Resilience4JCircuitBreakerFactory();
    }
}

2. 监控与熔断

@Component
public class AIMonitor {
    
    private final MeterRegistry meterRegistry;
    private final Counter requestCounter;
    private final Timer responseTimer;
    
    public AIMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.requestCounter = Counter.builder("ai.requests")
            .description("AI API requests")
            .register(meterRegistry);
        this.responseTimer = Timer.builder("ai.response.time")
            .register(meterRegistry);
    }
    
    @Around("@annotation(AiCall)")
    public Object monitor(ProceedingJoinPoint pjp) throws Throwable {
        requestCounter.increment();
        return responseTimer.record(() -> {
            try {
                return pjp.proceed();
            } catch (Throwable e) {
                meterRegistry.counter("ai.errors").increment();
                throw e;
            }
        });
    }
}

五、最佳实践建议

  1. 分级限流策略 :
  1. 全局 API 限流
  2. 用户级别限流
  3. 基于业务优先级限流
  1. 队列选择原则 :
  1. 实时性要求高:使用内存队列
  2. 数据持久化要求:使用 Kafka/RabbitMQ
  3. 海量数据:使用 Redis Stream
  1. 批量处理优化 :
  1. 根据 AI 服务商的最大批量限制设置
  2. 动态调整批量大小
  3. 失败重试和部分成功处理
  1. 监控指标 :
// 关键监控指标
- QPS/TPS
- 平均响应时间
- 错误率
- 队列积压长度
- 系统资源使用率
  1. 降级策略 :
@Service
public class FallbackService {
    
    @CircuitBreaker(name = "aiService", fallbackMethod = "fallback")
    public AiResponse callAI(String prompt) {
        return aiClient.generate(prompt);
    }
    
    public AiResponse fallback(String prompt, Throwable t) {
        // 返回缓存结果或简化版本
        return new AiResponse("系统繁忙,请稍后重试");
    }
}

这些方案可以根据实际业务需求进行组合使用,建议先从小规模开始,逐步优化调整参数。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐