前言


上周我们完成了 Python AI 微服务的基础架构搭建,打通了 Java 业务层与 Python AI 层之间的 RabbitMQ 消息通道。截至目前,我重点推进了 RAG(检索增强生成) 和 视觉题目识别 两大核心功能的落地,让错题本从"简单的聊天机器人"进化为真正的"智能学习助手"。

一、RAG 检索增强生成

1.1 为什么需要 RAG?

单纯的对话式 AI 存在两个问题:

  • 知识滞后:大模型的训练数据是固定的,无法了解我们教材的最新内容。
  • 幻觉问题:AI 可能会编造不存在的知识点或解题方法.

RAG 技术的引入完美解决了这些问题:先从本地知识库检索相关知识,再让 AI 基于这些知识回答问题,确保答案的准确性和专业性。

1.2 向量数据库选型与实现

我们选择了 FAISS(Facebook AI Similarity Search)作为向量数据库,配合 sentence-transformers 实现文本向量化。

核心封装类:FAISSStore:

"""
FAISS向量数据库封装
用于存储和检索知识向量
"""
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Dict

class FAISSStore:
    """FAISS向量存储"""

    def __init__(self, embedding_model: str = "paraphrase-multilingual-MiniLM-L12-v2",
                 dimension: int = 384):
        self.dimension = dimension
        self.documents = []  # 存储原始文本
        self.metadatas = []  # 存储元数据

        # 初始化嵌入模型(支持多语言)
        self.embedding_model = SentenceTransformer(embedding_model)

        # 初始化FAISS索引(使用L2距离)
        self.index = faiss.IndexFlatL2(dimension)

    def add_documents(self, texts: List[str], metadatas: List[Dict] = None) -> List[int]:
        """添加文档到向量库"""
        if not texts:
            return []

        # 生成嵌入向量
        embeddings = self.embedding_model.encode(texts, convert_to_numpy=True)
        embeddings = np.array(embeddings).astype('float32')

        # 添加到FAISS索引
        ids = list(range(len(self.documents), len(self.documents) + len(texts)))
        self.index.add(embeddings)

        # 存储文档和元数据
        self.documents.extend(texts)
        if metadatas:
            self.metadatas.extend(metadatas)
        else:
            self.metadatas.extend([{}] * len(texts))

        return ids

    def similarity_search(self, query: str, k: int = 5) -> List[Dict]:
        """相似度搜索"""
        if len(self.documents) == 0:
            return []

        # 生成查询向量
        query_embedding = self.embedding_model.encode([query], convert_to_numpy=True)
        query_embedding = np.array(query_embedding).astype('float32')

        # 搜索最相似的向量
        k = min(k, len(self.documents))
        distances, indices = self.index.search(query_embedding, k)

        # 构建结果
        results = []
        for idx, distance in zip(indices[0], distances[0]):
            if idx != -1:
                results.append({
                    'content': self.documents[idx],
                    'metadata': self.metadatas[idx],
                    'distance': float(distance)
                })

        return results

    def save(self, path: str = "./data/faiss_index"):
        """保存向量库到磁盘"""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        faiss.write_index(self.index, f"{path}.index")
        
        with open(f"{path}.docs.json", 'w', encoding='utf-8') as f:
            json.dump({
                'documents': self.documents,
                'metadatas': self.metadatas
            }, f, ensure_ascii=False, indent=2)

    def load(self, path: str = "./data/faiss_index"):
        """从磁盘加载向量库"""
        self.index = faiss.read_index(f"{path}.index")
        
        with open(f"{path}.docs.json", 'r', encoding='utf-8') as f:
            data = json.load(f)
            self.documents = data['documents']
            self.metadatas = data['metadatas']

关键技术点:

  • 使用 paraphrase-multilingual-MiniLM-L12-v2 模型,支持中英文混合检索。
  • 向量维度为 384,平衡了精度和性能。
  • 支持持久化存储,服务重启后可快速加载。

1.3 RAG 服务层封装

"""
RAG (Retrieval-Augmented Generation) 检索增强生成服务
结合向量数据库和大模型,实现精准的知识问答
"""
from vector_store.faiss_store import FAISSStore
from core.ai_service import ai_service

class RAGService:
    """RAG服务 - 检索增强生成"""

    def __init__(self):
        self.vector_store = FAISSStore()
        self.ai_service = ai_service

        # RAG系统提示词
        self.rag_system_prompt = """你是一个基于知识库的智能学习助手。
你会收到相关的知识片段和用户的问题。
请基于提供的知识片段来回答问题,如果知识片段中没有相关信息,请诚实地告诉用户。
回答时要准确、清晰,并适当引用知识库中的内容。"""

    def rag_query(self, query: str, user_id: str = "system",
                  top_k: int = 3) -> str:
        """
        RAG检索查询
        
        Args:
            query: 用户查询
            user_id: 用户ID
            top_k: 检索最相关的K个知识片段
            
        Returns:
            AI生成的答案
        """
        try:
            # 1. 从向量库检索最相关的知识片段
            docs = self.vector_store.similarity_search(query, k=top_k)

            if not docs:
                # 如果没有检索到知识,直接使用普通对话
                return self.ai_service.chat(user_id, query)

            # 2. 构建上下文
            context_parts = []
            for i, doc in enumerate(docs, 1):
                content = doc.get('content', '')
                source = doc.get('metadata', {}).get('source', '未知来源')
                context_parts.append(f"[知识片段 {i}] (来源: {source})\n{content}\n")

            context = "\n".join(context_parts)

            # 3. 构建增强提示词
            enhanced_prompt = f"""【相关知识库内容】
{context}

【用户问题】
{query}

请基于上述知识库内容回答用户的问题。如果知识库中的信息不足以完整回答问题,请结合你的知识进行补充,但要明确标注哪些是来自知识库,哪些是你的补充。"""

            # 4. 调用AI生成答案
            messages = [
                {"role": "system", "content": self.rag_system_prompt},
                {"role": "user", "content": enhanced_prompt}
            ]

            response = self.ai_service.client.chat.completions.create(
                model=settings.ai.model,
                messages=messages,
                temperature=0.7,
                max_tokens=settings.ai.max_tokens
            )

            answer = response.choices[0].message.content
            return answer

        except Exception as e:
            raise Exception(f"RAG查询异常: {str(e)}")


# 全局RAG服务实例
rag_service = RAGService()

RAG 工作流程:
1.检索阶段:将用户问题向量化,在 FAISS 中检索最相关的 K 个知识片段。
2.增强阶段:将检索到的知识片段作为上下文,构建增强提示词。
3.生成阶段:调用通义千问 API,基于知识库内容生成准确答案。

1.4知识库构建脚本

为了让 RAG 系统有知识可查,我编写了知识库构建脚本,从 MySQL 数据库中提取错题数据并向量化:

"""
从MySQL导入历史错题到向量库(用于错因分析和RAG问答)
"""
from core.rag_service import rag_service
from database.mysql_client import mysql_client

class KnowledgeBaseBuilder:
    """知识库构建器"""

    def __init__(self):
        self.rag_service = rag_service

    def import_from_mysql(self, limit=500):
        """从MySQL导入错题到向量库"""
        sql = """
        SELECT id, stem, analysis, wrong_reason, tags
        FROM topic 
        WHERE deleted = 0 
        AND stem IS NOT NULL 
        AND analysis IS NOT NULL
        LIMIT %s
        """

        topics = mysql_client.execute_query(sql, (limit,))

        texts = []
        metadatas = []

        for topic in topics:
            # 构建知识文本:题目 + 解析 + 错因
            content = f"题目:{topic['stem']}\n解析:{topic['analysis']}"
            if topic.get('wrong_reason'):
                content += f"\n常见错误:{topic['wrong_reason']}"

            metadata = {
                "topic_id": topic['id'],
                "tags": topic.get('tags', '').split(',') if topic.get('tags') else [],
                "source": "mysql",
                "type": "error_note"
            }

            texts.append(content)
            metadatas.append(metadata)

        if texts:
            ids = self.rag_service.add_knowledge(texts, metadatas)

        return len(texts)

    def build(self):
        """构建知识库"""
        total = self.import_from_mysql(limit=1000)
        
        # 保存到 ./data/kb_error_notebook
        project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        save_path = os.path.join(project_root, "data", "kb_error_notebook")
        
        self.rag_service.save_vector_store(save_path)


def main():
    builder = KnowledgeBaseBuilder()
    builder.build()

执行后会在data/文件夹生成:data/kb_error_notebook.index - FAISS 索引文件、data/kb_error_notebook.docs.json - 文档和元数据。

1.5运行结果

在Swagger UI的接口中我们输入问题:如何计算不定积分?此时分别观察java端和python端的日志,得到:

可以看出已经得到了相应的结果,与此同时,在RabbitMQ Management中观察到ai-dialog-queue中的unlocked和total的值均为1:

检索成功后,两者的值重新变为0,这意味着java和python两端成功完成了消息队列的通信。

二、视觉题目识别:拍照即可录入错题

2.1 功能需求

传统的错题录入需要手动输入题干、选项、答案,体验极差。我们引入了视觉识别功能:用户只需拍照上传,AI 自动识别题目内容、提取关键信息。

2.2 通义千问视觉模型接入

阿里云通义千问提供了强大的视觉理解能力(qwen-vl-max),我们通过 OpenAI 兼容模式接入:

def analyze_image(self, image_url: str) -> Dict:
    """
    分析图片中的题目
    
    Args:
        image_url: 图片 URL
        
    Returns:
        分析结果字典,包含:
        - question_stem: 识别出的题干
        - user_answer: 用户手写答案
        - correct_answer: 正确答案
        - analysis: 题目解析
        - tags: 知识点标签
    """
    try:
        import requests
        import base64
        from io import BytesIO
        from PIL import Image
        from urllib.parse import unquote, parse_qs, urlparse

        # 处理百度图片链接:自动提取真实图片 URL
        if 'image.baidu.com' in image_url:
            parsed_url = urlparse(image_url)
            query_params = parse_qs(parsed_url.query)
            if 'objurl' in query_params:
                real_url = unquote(unquote(query_params['objurl'][0]))
                image_url = real_url

        # 下载图片
        response = requests.get(image_url, timeout=15, headers={
            'User-Agent': 'Mozilla/5.0 ...',
            'Referer': 'https://www.baidu.com/'
        })
        response.raise_for_status()

        # 验证图片格式
        img = Image.open(BytesIO(response.content))
        if img.format not in ['JPEG', 'PNG', 'WEBP', 'BMP']:
            raise ValueError(f"不支持的图片格式: {img.format}")

        # 转换为 base64
        buffered = BytesIO()
        img.save(buffered, format=img.format)
        img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')

        # 构建消息内容(多模态输入)
        content = [
            {
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/{img.format.lower()};base64,{img_base64}"
                }
            },
            {
                "type": "text",
                "text": """请识别图片中的数学/物理/化学/英语题目内容,并按以下 JSON 格式返回结果:
{
    "question_stem": "识别出的完整题干内容(公式用 LaTeX 格式)",
    "user_answer": "用户手写的答案(如果有)",
    "correct_answer": "正确答案(如果有)",
    "analysis": "题目解析和解题步骤",
    "tags": ["知识点1", "知识点2", "知识点3"]
}

要求:
1. 公式用 LaTeX 格式
2. 如果图片中没有题目,返回空字符串
"""
            }
        ]

        messages = [
            {"role": "system", "content": "你是一个专业的题目识别和分析助手。"},
            {"role": "user", "content": content}
        ]

        # 调用视觉模型
        response = self.client.chat.completions.create(
            model=settings.ai.vision_model,  # qwen-vl-max
            messages=messages,
            temperature=settings.ai.vision_temperature  # 0.1(低温度保证稳定性)
        )

        result_text = response.choices[0].message.content
        
        # 解析 JSON 结果
        result = json.loads(result_text)
        return result

    except Exception as e:
        raise Exception(f"图片分析异常: {str(e)}")

关键技术点:

  • 图片预处理:自动处理百度图片等特殊链接,提取真实 URL。
  • Base64 编码:将图片转换为 base64 格式,符合 API 要求。
  • 结构化输出:强制 AI 返回 JSON 格式,便于后续程序处理。
  • LaTeX 公式支持:数学公式自动转换为 LaTeX 格式。

2.3 图片分析任务的消息队列集成

视觉识别是耗时操作,我们通过 RabbitMQ 异步处理:

def on_picture_message(self, ch, method, properties, body):
    """
    处理图片分析任务(来自 picture-queue)
    """
    try:
        task = json.loads(body)
        request_id = task.get("requestId")
        user_id = str(task.get("userId"))
        url = task.get("url")

        # 1. 调用视觉模型识别图片
        result = ai_service.analyze_image(url)

        question_stem = result.get("question_stem", "")
        user_answer = result.get("user_answer", "")
        correct_answer = result.get("correct_answer", "")

        # 2. RAG 检索错因分析
        error_cause_context = ""
        if user_answer and correct_answer:
            error_cause_query = f"错误答案 {user_answer} 正确答案 {correct_answer}"
            error_cause_results = rag_service.vector_store.similarity_search(
                error_cause_query, k=3
            )
            
            if error_cause_results:
                error_cause_context = "\n".join([
                    f"- {doc['content'][:200]}..."
                    for doc in error_cause_results
                ])

        # 3. 构建响应
        response = {
            "requestId": request_id,
            "questionStem": question_stem,
            "analyseQuestion": result.get("analysis", ""),
            "analyseWrong": error_cause_context,
            "suggestLabels": result.get("tags", [])
        }

        # 4. 发送结果回 Java 端
        self._send_result(settings.rabbitmq.ai_result_queue,
                          settings.rabbitmq.ai_result_routing_key,
                          response)

        ch.basic_ack(delivery_tag=method.delivery_tag)

    except Exception as e:
        logger.error(f"处理图片任务失败:{str(e)}", exc_info=True)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

处理流程:
1.接收 Java 端发送的图片 URL。
2.调用视觉模型识别题目内容。
3.基于识别结果,RAG 检索错因分析。
4.将完整结果返回给 Java 端。

2.4 运行结果

首先我从百度找了一道高数题的图片,图片的url是:https://img2.baidu.com/it/u=257672646,1143862643&fm=253&fmt=auto&app=138&f=JPEG?w=800&h=1160

同样,在Swagger UI的analyze-image接口中输入该图片的url并execute,观察到两端的页面结果:

RabbitMQ Management中的过程就不再赘述,可以看到java和python两端已经成功完成了消息队列的通信,而Swagger UI也返回了我们需要的该题的答案。不过需要注意的是,由于目前知识库的数据仍然很少,所以ython端并没有检索到类似的题目并推送。

三、服务启动优化

在 main.py 中,我优化了服务启动流程,确保知识库在服务启动时就加载完成:

class AIServiceApp:
    """AI服务应用"""

    def start(self):
        """启动服务"""
        try:
            logger.info("=" * 60)
            logger.info("🚀 Python AI服务启动中...")
            logger.info(f"📡 RabbitMQ Host: {settings.rabbitmq.host}")
            logger.info(f"🤖 AI Model: {settings.ai.model}")
            logger.info(f"👁️ Vision Model: {settings.ai.vision_model}")
            logger.info("=" * 60)

            # 【关键修改】1. 先加载知识库(阻塞等待完成)
            try:
                from core.rag_service import rag_service

                # 加载错题本知识库
                knowledge_path = "./data/kb_error_notebook"
                if os.path.exists(f"{knowledge_path}.index"):
                    logger.info("📚 正在加载错题本知识库...")
                    rag_service.load_vector_store(knowledge_path)
                    stats = rag_service.vector_store.get_stats()
                    logger.info(
                        f"✅ 知识库加载成功 - 文档数: {stats['total_documents']}, "
                        f"索引大小: {stats['index_size']}")
                else:
                    logger.warning("⚠️ 未找到知识库文件,请先运行 scripts/build_knowledge_base.py")
            except Exception as e:
                logger.error(f"❌ 加载知识库失败: {str(e)}")

            # 【关键修改】2. 知识库加载完成后,再创建消费者
            logger.info("✅ 知识库准备就绪,正在初始化消息队列消费者...")
            self.consumer = MQConsumer()
            self.running = True

            logger.info("🎉 服务启动成功,开始处理任务...")

            # 3. 开始消费消息(这会阻塞)
            self.consumer.start_consuming()

        except Exception as e:
            logger.error(f"❌ 服务启动失败: {str(e)}", exc_info=True)
            sys.exit(1)

启动日志示例:

============================================================
🚀 Python AI服务启动中...
📡 RabbitMQ Host: 47.104.236.72
🤖 AI Model: qwen-max
👁️ Vision Model: qwen-vl-max
============================================================
📚 正在加载错题本知识库...
✅ 知识库加载成功 - 文档数: 856, 索引大小: 856
✅ 知识库准备就绪,正在初始化消息队列消费者...
🎉 服务启动成功,开始处理任务...
Python AI 服务已启动,正在监听队列...
正在监听队列:picture-queue
正在监听队列:ai-dialog-queue

四、总结与规划


本周我们完成了 RAG 检索增强生成 和 视觉题目识别 两大核心功能的落地,让 AI 错题本具备了真正的"智能":

1.RAG 检索:让 AI 回答有据可依,避免幻觉。
2.视觉识别:拍照即可录入错题,大幅提升用户体验。
3.异步架构:RabbitMQ 解耦,保证系统高并发和低延迟。

目前的架构已经具备了极强的扩展性,下周我们将重点推进个性化推题功能的实现,例如基于用户薄弱知识点实现精准的题目推荐,结合 RAG 检索的错因知识生成精准的错因分析报告等等。让错题本真正做到"懂你所需,推你所缺"。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐