万字干货!手把手教你掌握RAG核心技术,终结AI“一本正经胡说八道”!
本文深入探讨了RAG(检索增强生成)技术,旨在解决大语言模型“幻觉”问题,使其回答有据可查。内容涵盖幻觉三大根源及七种解决方案,详解四大向量数据库(FAISS、Pinecone、腾讯云VectorDB、Weaviate)的实战配置,深入解析Embedding模型,并详细介绍了文档加载、智能分割、检索器原理、中文分词与关键词提取、重排序Rerank原理与实践、RAG优化策略(多查询融合、问题分解、混合检索、逻辑路由、父文档检索、CRAG)以及完整的RAG系统架构。最后,提供了快速开始模板和性能监控优化方法,助你构建高效、准确的智能问答系统。
万字长文,一次性掌握RAG核心技术
大语言模型“一本正经胡说八道”的毛病,终于有解了!这就是今天要讲的RAG(检索增强生成)技术。它能让AI的回答有据可查,避免幻觉,真正成为你的智能助手。
🤔 第一关:搞懂LLM为什么会“幻觉”
幻觉的三大根源
- 训练数据偏差:模型只在训练数据中出现过的知识上表现可靠
- 参数记忆限制:知识被压缩在权重中,细节容易丢失
- 概率生成本质:每次都是“猜”下一个词,不是“回忆”事实
7大解决方案对比
| 方案 | 原理 | 适用场景 | 复杂度 |
|---|---|---|---|
| 提示工程 | 在Prompt中明确要求“不要编造” | 简单问答 | ★☆☆ |
| Few-shot示例 | 提供正确回答示例 | 格式固定场景 | ★★☆ |
| 知识检索 | 实时查外部知识库 | 动态知识问答 | ★★★ |
| 思维链推理 | 让模型展示推理过程 | 复杂问题 | ★★★ |
| 自我验证 | 让模型检查自己回答 | 高准确性要求 | ★★★ |
| 不确定性量化 | 输出置信度分数 | 风险控制 | ★★★ |
| 集成方法 | 多个模型投票 | 关键决策 | ★★★★ |
# 实战:减少幻觉的Prompt模板
anti_hallucination_prompt = """
请你基于提供的知识回答问题,遵循以下规则:
1. 只使用提供的参考信息
2. 如果信息不足,明确说“根据已有信息无法确定”
3. 不要添加额外信息
4. 如果信息冲突,说明冲突点
参考信息:{context}
问题:{question}
回答:
"""
🗄️ 第二关:四大向量数据库实战配置
- FAISS - Facebook开源,本地首选
# 安装:pip install faiss-cpu
import faiss
import numpy as np
# 创建索引
dimension = 768 # embedding维度
index = faiss.IndexFlatL2(dimension)
# 添加向量
embeddings = np.random.rand(1000, dimension).astype('float32')
index.add(embeddings)
# 搜索
query = np.random.rand(1, dimension).astype('float32')
k = 5
distances, indices = index.search(query, k)
print(f"找到最相似的 {k} 个文档索引:{indices}")
FAISS进阶用法:
# IVF索引加速
nlist = 100 # 聚类中心数
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index.train(embeddings) # 先训练
index.add(embeddings)
# 保存和加载
faiss.write_index(index, "my_index.faiss")
loaded_index = faiss.read_index("my_index.faiss")
- Pinecone - 云端托管,简单易用
# 安装:pip install pinecone-client
import pinecone
from langchain.vectorstores import Pinecone
from langchain.embeddings import OpenAIEmbeddings
# 初始化
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")
# 创建索引
index_name = "my-knowledge-base"
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=1536, # text-embedding-ada-002维度
metric="cosine"
)
# 连接索引
index = pinecone.Index(index_name)
# 通过LangChain使用
embeddings = OpenAIEmbeddings()
vectorstore = Pinecone(index, embeddings, "text")
- 腾讯云VectorDB - 国产新星
from tcvectordb.model.enum import FieldType, IndexType
from tcvectordb.model.index import VectorIndex, FilterIndex
from tcvectordb.model.document import Document, SearchParams
from tcvectordb.model.collection import Embedding
# 创建连接
client = VectorDBClient(
url='http://xxxxx.xxxxx.xxxxx.tencentcloudapi.com',
username='root',
key='your-key',
timeout=30
)
# 创建集合
db = client.database('db-test')
coll = db.create_collection(
name='my-collection',
shard=1,
replicas=0,
description='知识库文档',
index=Index(
VectorIndex(
'vector',
768,
IndexType.HNSW,
MetricType.COSINE
),
FilterIndex('id', FieldType.String, IndexType.PRIMARY_KEY)
)
)
- Weaviate - 开源图向量混合
import weaviate
from weaviate.classes.init import AdditionalConfig, Timeout
import weaviate.classes as wvc
# 客户端配置
client = weaviate.WeaviateClient(
additional_config=AdditionalConfig(
timeout=Timeout(init=2, query=45, insert=120) # 超时设置
)
)
# 创建集合
client.collections.create(
name="Document",
vectorizer_config=wvc.config.Configure.Vectorizer.text2vec_openai(),
properties=[
wvc.config.Property(
name="title",
data_type=wvc.config.DataType.TEXT
),
wvc.config.Property(
name="content",
data_type=wvc.config.DataType.TEXT
)
]
)
📊 向量数据库选型指南
| 维度 | FAISS | Pinecone | 腾讯云VectorDB | Weaviate |
|---|---|---|---|---|
| 部署方式 | 本地 | 全托管 | 全托管 | 自托管/云托管 |
| 开源 | ✅ | ❌ | ❌ | ✅ |
| 最大数据量 | 内存限制 | 无限制 | PB级 | 无限制 |
| 多模态 | ❌ | ✅ | ✅ | ✅ |
| 成本 | 免费 | 按用量 | 按用量 | 免费/自建成本 |
| 适合场景 | 中小规模 | 企业级 | 国内企业 | 图向量混合 |
🔤 第三关:Embedding模型深入解析
文本向量化原理
from sentence_transformers import SentenceTransformer
# 加载模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
# 生成向量
sentences = ['我爱编程', 'Python是最好的语言', '机器学习很有趣']
embeddings = model.encode(sentences)
print(f"向量维度:{embeddings.shape}") # (3, 384)
# 计算相似度
from sklearn.metrics.pairwise import cosine_similarity
similarity = cosine_similarity(
[embeddings[0]], # "我爱编程"
[embeddings[1]] # "Python是最好的语言"
)
print(f"语义相似度:{similarity[0][0]:.3f}")
主流Embedding模型对比
embedding_models = {
"OpenAI text-embedding-3": {
"dimension": 1536,
"价格": "$0.13/1M tokens",
"特点": "效果最好,英文优化"
},
"BGE-M3": {
"dimension": 1024,
"价格": "免费开源",
"特点": "多语言,检索SOTA"
},
"Jina Embeddings": {
"dimension": 768,
"价格": "免费开源",
"特点": "中文优化,8K上下文"
},
"阿里通义千问": {
"dimension": 1024,
"价格": "API调用",
"特点": "中文场景优化"
}
}
📄 第四关:文档加载与智能分割
- LangChain文档加载器大全
from langchain.document_loaders import (
TextLoader, # 文本文件
PyPDFLoader, # PDF
UnstructuredWordDocumentLoader, # Word
CSVLoader, # CSV
UnstructuredHTMLLoader, # HTML
JSONLoader, # JSON
SeleniumURLLoader, # 动态网页
YoutubeLoader, # YouTube视频
GitLoader, # Git仓库
NotionDirectoryLoader # Notion导出
)
# 加载PDF
loader = PyPDFLoader("docs/report.pdf")
pages = loader.load()
print(f"加载了 {len(pages)} 页PDF")
# 加载网页
loader = SeleniumURLLoader(["https://example.com"])
docs = loader.load()
# 批量加载目录
from langchain.document_loaders import DirectoryLoader
loader = DirectoryLoader(
'./docs',
glob="**/*.pdf",
loader_cls=PyPDFLoader
)
documents = loader.load()
- 递归字符分割器 - 深入源码
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 核心分割逻辑
def split_text(self, text: str) -> List[str]:
"""递归分割文本的源码逻辑"""
final_chunks = []
# 按分隔符优先级分割
separators = self._separators if self._separators is not None else ["\n\n", "\n", " ", ""]
for separator in separators:
if separator:
splits = text.split(separator)
else:
splits = list(text)
# 合并小片段
good_splits = []
current_chunk = ""
for s in splits:
if len(current_chunk) + len(s) < self._chunk_size:
current_chunk += s
else:
if current_chunk:
good_splits.append(current_chunk)
current_chunk = s
if current_chunk:
good_splits.append(current_chunk)
if len(good_splits) > 1:
final_chunks.extend(good_splits)
break
return final_chunks
- 智能分割策略
# 按语义分割
from langchain.text_splitter import SemanticChunker
from langchain.embeddings import OpenAIEmbeddings
text_splitter = SemanticChunker(
embeddings=OpenAIEmbeddings(),
breakpoint_threshold_type="percentile" # 按分位数分割
)
# 按Markdown结构分割
from langchain.text_splitter import MarkdownTextSplitter
markdown_splitter = MarkdownTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
# 按代码结构分割
from langchain.text_splitter import (
Language,
RecursiveCharacterTextSplitter
)
python_splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON,
chunk_size=2000,
chunk_overlap=200
)
🔍 第五关:检索器核心原理
基础检索器实现
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import BM25Retriever, EnsembleRetriever
class SmartRetriever:
def __init__(self, vectorstore, bm25_retriever):
# 混合检索器
self.ensemble_retriever = EnsembleRetriever(
retrievers=[vectorstore.as_retriever(), bm25_retriever],
weights=[0.7, 0.3] # 向量检索70%,关键词检索30%
)
def retrieve(self, query, k=5, score_threshold=0.7):
"""智能检索文档"""
# 1. 并行检索
vector_results = self.vector_retriever.get_relevant_documents(query)
bm25_results = self.bm25_retriever.get_relevant_documents(query)
# 2. 结果融合
all_results = self._merge_results(vector_results, bm25_results)
# 3. 去重
unique_results = self._deduplicate(all_results)
# 4. 重排序
reranked = self._rerank(query, unique_results)
# 5. 分数过滤
filtered = [doc for doc in reranked
if doc.metadata.get('score', 0) > score_threshold]
return filtered[:k]
def _merge_results(self, vec_results, bm25_results):
"""合并两种检索结果"""
# 使用RRF(倒数排名融合)算法
fused_scores = {}
for i, doc in enumerate(vec_results):
doc_id = doc.metadata.get('id', str(i))
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + 1/(60 + i)
for i, doc in enumerate(bm25_results):
doc_id = doc.metadata.get('id', str(i))
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + 1/(60 + i)
# 按融合分数排序
return sorted(vec_results + bm25_results,
key=lambda x: fused_scores.get(
x.metadata.get('id', '0'), 0),
reverse=True)
✂️ 第六关:中文分词与关键词提取
import jieba
import jieba.analyse
from collections import Counter
class ChineseTextProcessor:
def __init__(self, user_dict_path=None):
if user_dict_path:
jieba.load_userdict(user_dict_path)
# 停用词
self.stopwords = set([
'的', '了', '在', '是', '我', '有', '和', '就',
'不', '人', '都', '一', '一个', '上', '也', '很',
'到', '说', '要', '去', '你', '会', '着', '没有',
'看', '好', '自己', '这'
])
def extract_keywords(self, text, top_k=10, with_weight=True):
"""提取关键词"""
# TF-IDF算法
keywords = jieba.analyse.extract_tags(
text,
topK=top_k,
withWeight=with_weight,
allowPOS=('n', 'vn', 'v', 'ns', 'nr') # 名词、动名词、动词、地名、人名
)
# TextRank算法
textrank_keywords = jieba.analyse.textrank(
text,
topK=top_k,
withWeight=with_weight,
allowPOS=('n', 'vn', 'v')
)
# 融合结果
combined = self._combine_keywords(keywords, textrank_keywords)
return combined
def segment_with_pos(self, text):
"""分词+词性标注"""
words = jieba.posseg.cut(text)
return [(word, flag) for word, flag in words]
def _combine_keywords(self, tfidf_kws, textrank_kws):
"""融合TF-IDF和TextRank结果"""
keyword_scores = {}
# 加权融合
for word, score in tfidf_kws:
keyword_scores[word] = keyword_scores.get(word, 0) + score * 0.6
for word, score in textrank_kws:
keyword_scores[word] = keyword_scores.get(word, 0) + score * 0.4
return sorted(keyword_scores.items(), key=lambda x: x[1], reverse=True)
# 使用示例
processor = ChineseTextProcessor()
text = "人工智能是计算机科学的一个分支,它企图了解智能的实质..."
keywords = processor.extract_keywords(text, top_k=5)
print("提取的关键词:", keywords)
# 输出:[('人工智能', 1.0), ('计算机科学', 0.8), ('智能', 0.6), ...]
🎯 第七关:重排序Rerank原理与实践
- Cross-Encoder重排序
from sentence_transformers import CrossEncoder
class Reranker:
def __init__(self, model_name="BAAI/bge-reranker-large"):
self.model = CrossEncoder(model_name)
def rerank(self, query, documents, top_k=5):
"""对检索结果重排序"""
# 构造query-doc对
pairs = [[query, doc.page_content] for doc in documents]
# 计算相关性分数
scores = self.model.predict(pairs)
# 组合结果
reranked_results = []
for i, (doc, score) in enumerate(zip(documents, scores)):
doc.metadata['rerank_score'] = float(score)
reranked_results.append((score, doc))
# 按分数排序
reranked_results.sort(key=lambda x: x[0], reverse=True)
return [doc for _, doc in reranked_results[:top_k]]
# 使用示例
reranker = Reranker()
initial_results = vectorstore.similarity_search(query, k=20)
final_results = reranker.rerank(query, initial_results, top_k=5)
- 多阶段重排序策略
class MultiStageReranker:
def __init__(self):
# 第一层:快速粗排
self.fast_ranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
# 第二层:精准精排
self.precise_ranker = CrossEncoder("BAAI/bge-reranker-large")
# 第三层:LLM判断
self.llm_ranker = OpenAI(model="gpt-3.5-turbo")
def rerank(self, query, documents):
"""三级重排序"""
# 第一阶段:快速过滤(前50->20)
stage1 = self._fast_rerank(query, documents[:50], keep=20)
# 第二阶段:精确排序(20->10)
stage2 = self._precise_rerank(query, stage1, keep=10)
# 第三阶段:LLM验证(10->5)
stage3 = self._llm_rerank(query, stage2, keep=5)
return stage3
def _fast_rerank(self, query, docs, keep=20):
"""快速粗排"""
pairs = [[query, doc.page_content[:500]] for doc in docs]
scores = self.fast_ranker.predict(pairs, batch_size=32)
return self._select_top_k(docs, scores, keep)
def _llm_rerank(self, query, docs, keep=5):
"""LLM智能判断"""
prompt = f"""请判断以下文档与问题的相关性,给出0-10分:
问题:{query}
请按以下格式输出:
文档1: 分数
文档2: 分数
...
文档列表:
{self._format_docs(docs)}
"""
response = self.llm_ranker.predict(prompt)
scores = self._parse_llm_scores(response)
return self._select_top_k(docs, scores, keep)
🚀 第八关:RAG优化策略大全
策略1:多查询融合
def multi_query_expansion(original_query, llm, n=3):
"""生成多个相关问题"""
prompt = f"""基于以下问题,生成 {n} 个不同角度的相关问题:
原始问题:{original_query}
生成的问题应该:
1. 从不同角度理解原问题
2. 使用同义词替换
3. 分解复杂问题
4. 用不同方式提问
输出格式(每行一个问题):"""
response = llm.predict(prompt)
queries = [original_query] + [q.strip() for q in response.split('\n') if q.strip()]
return queries[:n+1]
# 并行检索多个查询
expanded_queries = multi_query_expansion(original_query, llm, n=3)
all_results = []
for q in expanded_queries:
results = vectorstore.similarity_search(q, k=3)
all_results.extend(results)
策略2:问题分解
def query_decomposition(complex_query, llm):
"""分解复杂问题"""
prompt = f"""将复杂问题分解为多个子问题:
复杂问题:{complex_query}
请分解为可以独立检索和回答的子问题。
输出格式:
1. 子问题1
2. 子问题2
..."""
response = llm.predict(prompt)
sub_queries = [q.strip() for q in response.split('\n') if q.strip()]
return sub_queries
# 分别回答子问题,然后整合
sub_queries = query_decomposition(complex_query, llm)
sub_answers = []
for sub_q in sub_queries:
context = retrieve_context(sub_q)
answer = answer_question(sub_q, context)
sub_answers.append(answer)
final_answer = integrate_answers(sub_answers, complex_query, llm)
策略3:混合检索
class HybridRetriever:
def __init__(self, vector_store, keyword_retriever):
self.vector_store = vector_store
self.keyword_retriever = keyword_retriever
def hybrid_search(self, query, alpha=0.7, k=10):
"""混合检索"""
# 向量检索
vector_results = self.vector_store.similarity_search_with_score(query, k=k*2)
# 关键词检索
keyword_results = self.keyword_retriever.get_relevant_documents(query)
# 归一化分数
vector_scores = self._normalize_scores([score for _, score in vector_results])
keyword_scores = self._normalize_scores(
[doc.metadata.get('bm25_score', 0) for doc in keyword_results]
)
# 融合结果
fused_results = {}
# 融合向量检索结果
for (doc, score), norm_score in zip(vector_results, vector_scores):
doc_id = doc.metadata.get('id', hash(doc.page_content))
fused_results[doc_id] = {
'doc': doc,
'score': alpha * norm_score + (1-alpha) * fused_results.get(doc_id, {}).get('score', 0)
}
# 融合关键词检索结果
for doc, norm_score in zip(keyword_results, keyword_scores):
doc_id = doc.metadata.get('id', hash(doc.page_content))
if doc_id in fused_results:
fused_results[doc_id]['score'] += (1-alpha) * norm_score
else:
fused_results[doc_id] = {
'doc': doc,
'score': (1-alpha) * norm_score
}
# 排序返回
sorted_results = sorted(
fused_results.values(),
key=lambda x: x['score'],
reverse=True
)
return [item['doc'] for item in sorted_results[:k]]
策略4:逻辑路由
class LogicalRouter:
def __init__(self, llm):
self.llm = llm
def route_query(self, query, chat_history):
"""路由查询到不同处理流程"""
prompt = f"""判断问题类型并选择处理策略:
问题:{query}
对话历史:{chat_history}
可选的策略:
1. vector_rag - 需要检索知识库的问题
2. direct_llm - 通用知识或逻辑推理
3. calculator - 数学计算
4. search_web - 需要实时信息
5. clarification - 需要澄清的问题
请只返回策略名称:"""
response = self.llm.predict(prompt).strip()
routing_rules = {
'vector_rag': self.handle_vector_rag,
'direct_llm': self.handle_direct_llm,
'calculator': self.handle_calculator,
'search_web': self.handle_search_web,
'clarification': self.handle_clarification
}
return routing_rules.get(response, self.handle_vector_rag)
策略5:父文档检索
class ParentDocumentRetriever:
def __init__(self, vector_store, parent_docs):
self.vector_store = vector_store
self.parent_docs = parent_docs # 父文档映射
def retrieve_with_parent(self, query, k=5):
"""检索子文档,返回父文档"""
# 检索子文档
child_docs = self.vector_store.similarity_search(query, k=k*3)
# 获取父文档
parent_ids = set()
parent_docs = []
for doc in child_docs:
parent_id = doc.metadata.get('parent_id')
if parent_id and parent_id not in parent_ids:
parent_docs.append(self.parent_docs[parent_id])
parent_ids.add(parent_id)
if len(parent_docs) >= k:
break
return parent_docs
策略6:CRAG(自我纠正RAG)
class CRAGSystem:
def __init__(self, retriever, llm, judge_llm):
self.retriever = retriever
self.llm = llm
self.judge_llm = judge_llm
def answer_with_correction(self, query):
"""带自我纠正的RAG"""
# 1. 初步检索
docs = self.retriever.retrieve(query)
# 2. 生成初步答案
context = "\n\n".join([doc.page_content for doc in docs])
answer = self.llm.predict(f"基于以下信息:\n{context}\n\n回答:{query}")
# 3. 置信度评估
confidence = self._evaluate_confidence(query, answer, docs)
if confidence < 0.7: # 低置信度
# 4. 扩大检索
expanded_docs = self._expand_retrieval(query, docs)
# 5. 重新生成
new_context = "\n\n".join([doc.page_content for doc in expanded_docs])
corrected_answer = self.llm.predict(f"基于以下更全信息:\n{new_context}\n\n重新回答:{query}")
# 6. 验证改进
improvement = self._verify_improvement(
query, answer, corrected_answer, docs, expanded_docs
)
if improvement:
return corrected_answer, expanded_docs
return answer, docs
🎯 完整RAG系统架构
class ProductionRAGSystem:
def __init__(self, config):
# 1. 初始化组件
self.embeddings = self._init_embeddings(config)
self.vector_store = self._init_vector_store(config)
self.llm = self._init_llm(config)
self.reranker = Reranker()
self.query_analyzer = QueryAnalyzer()
# 2. 多检索器
self.retrievers = {
'vector': VectorRetriever(self.vector_store),
'keyword': KeywordRetriever(),
'hybrid': HybridRetriever(self.vector_store, self.keyword_retriever)
}
# 3. 缓存
self.cache = RedisCache()
def answer_question(self, query, session_id, use_cache=True):
"""完整问答流程"""
# 检查缓存
if use_cache:
cached = self.cache.get(query, session_id)
if cached:
return cached
# 1. 查询分析
query_type, decomposed = self.query_analyzer.analyze(query)
# 2. 智能路由
retriever_type = self._route_retriever(query_type)
retriever = self.retrievers[retriever_type]
# 3. 多策略检索
all_docs = []
for sub_query in decomposed:
docs = retriever.retrieve(sub_query, k=10)
all_docs.extend(docs)
# 4. 去重和重排序
unique_docs = self._deduplicate(all_docs)
reranked_docs = self.reranker.rerank(query, unique_docs, top_k=8)
# 5. 上下文构建
context = self._construct_context(reranked_docs, query)
# 6. 生成答案
answer = self._generate_answer(query, context)
# 7. 引用溯源
answer_with_citations = self._add_citations(answer, reranked_docs)
# 8. 缓存结果
self.cache.set(query, session_id, answer_with_citations)
return answer_with_citations
🚀 快速开始模板
# requirements.txt
langchain==0.1.0
langchain-openai==0.0.1
sentence-transformers==2.2.2
faiss-cpu==1.7.4
rank-bm25==0.2.1
jieba==0.42.1
redis==4.5.4
# 快速启动
from rag_system import ProductionRAGSystem
config = {
"embedding_model": "BAAI/bge-large-zh",
"llm_model": "gpt-4",
"vector_store": "faiss",
"cache": "redis"
}
rag = ProductionRAGSystem(config)
# 问答
answer = rag.answer_question(
query="什么是机器学习?",
session_id="user_123",
use_cache=True
)
print(answer)
📈 性能监控与优化
class RAGMonitor:
def __init__(self):
self.metrics = {
'retrieval_time': [],
'generation_time': [],
'cache_hit_rate': 0,
'accuracy_scores': []
}
def log_retrieval(self, query, docs, retrieval_time):
"""记录检索指标"""
self.metrics['retrieval_time'].append(retrieval_time)
# 计算检索相关性
relevance = self._calculate_relevance(query, docs)
self.metrics['accuracy_scores'].append(relevance)
def get_performance_report(self):
"""生成性能报告"""
return {
'avg_retrieval_time': np.mean(self.metrics['retrieval_time']),
'avg_generation_time': np.mean(self.metrics['generation_time']),
'cache_hit_rate': self.metrics['cache_hit_rate'],
'avg_accuracy': np.mean(self.metrics['accuracy_scores']),
'total_queries': len(self.metrics['retrieval_time'])
}
🎯 核心要点总结
- 幻觉问题:通过RAG从根本上解决,让AI回答有据可查
- 向量数据库:根据场景选择,小项目用FAISS,大项目用Pinecone/腾讯云
- 文档处理:合理分割是关键,递归分割器+语义分割结合使用
- 检索优化:混合检索+重排序是效果提升的关键
- 中文优化:使用BGE、Jina等中文优化模型
- RAG进阶:多查询、问题分解、逻辑路由等策略组合使用
2026年AI行业最大的机会,毫无疑问就在应用层!
字节跳动已有7个团队全速布局Agent
大模型岗位暴增69%,年薪破百万!
腾讯、京东、百度开放招聘技术岗,80%与AI相关……
如今,超过60%的企业都在推进AI产品落地,而真正能交付项目的 大模型应用开发工程师 **,**却极度稀缺!
落地AI应用绝对不是写几个prompt,调几个API就能搞定的,企业真正需要的,是能搞定这三项核心能力的人:
✅RAG:融入外部信息,修正模型输出,给模型装靠谱大脑
✅Agent智能体:让AI自主干活,通过工具调用(Tools)环境交互,多步推理完成复杂任务。比如做智能客服等等……
✅微调:针对特定任务优化,让模型适配业务
目前,脉脉上有超过1000家企业发布大模型相关岗位,人工智能岗平均月薪7.8w!实习生日薪高达4000!远超其他行业收入水平!
技术的稀缺性,才是你「值钱」的关键!
具备AI能力的程序员,比传统开发高出不止一截!有的人早就转行AI方向,拿到百万年薪!👇🏻👇🏻

AI浪潮,正在重构程序员的核心竞争力!现在入场,仍是最佳时机!
我把大模型的学习全流程已经整理📚好了!抓住AI时代风口,轻松解锁职业新可能,希望大家都能把握机遇,实现薪资/职业跃迁~
这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】

⭐️从大模型微调到AI Agent智能体搭建
剖析AI技术的应用场景,用实战经验落地AI技术。从GPT到最火的开源模型,让你从容面对AI技术革新!
大模型微调
-
掌握主流大模型(如DeepSeek、Qwen等)的微调技术,针对特定场景优化模型性能。
-
学习如何利用领域数据(如制造、医药、金融等)进行模型定制,提升任务准确性和效率。
RAG应用开发
- 深入理解检索增强生成(Retrieval-Augmented Generation, RAG)技术,构建高效的知识检索与生成系统。
- 应用于垂类场景(如法律文档分析、医疗诊断辅助、金融报告生成等),实现精准信息提取与内容生成。
AI Agent智能体搭建
- 学习如何设计和开发AI Agent,实现多任务协同、自主决策和复杂问题解决。
- 构建垂类场景下的智能助手(如制造业中的设备故障诊断Agent、金融领域的投资分析Agent等)。

如果你也有以下诉求:
快速链接产品/业务团队,参与前沿项目
构建技术壁垒,从竞争者中脱颖而出
避开35岁裁员危险期,顺利拿下高薪岗
迭代技术水平,延长未来20年的新职业发展!
……
那这节课你一定要来听!
因为,留给普通程序员的时间真的不多了!
立即扫码,即可免费预约
「AI技术原理 + 实战应用 + 职业发展」
「大模型应用开发实战公开课」
👇👇

👍🏻还有靠谱的内推机会+直聘权益!!
完课后赠送:大模型应用案例集、AI商业落地白皮书
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)