AI + 数据分析助手:自然语言驱动的智能数据分析平台

1. 项目概述

本项目构建一个智能数据分析助手,允许用户通过自然语言提问,系统自动解析意图、生成SQL查询、执行数据库查询,并以图表或文本形式返回分析结果。核心目标是将复杂的SQL编写工作自动化,使业务人员能快速获取数据洞察。

核心流程:用户问题 → 语义解析(意图+实体识别) → SQL生成 → 数据库执行 → 结果可视化/文本总结 → 返回用户。

技术栈

  • 后端:Python + FastAPI + SQLAlchemy
  • 前端:Streamlit(演示) / React(生产)
  • 自然语言处理:LangChain + OpenAI API(或本地部署的模型如ChatGLM)
  • 数据库:MySQL / PostgreSQL / SQLite(示例)
  • 图表:Plotly / Matplotlib / ECharts

2. UML建模

2.1 用例图

渲染错误: Mermaid 渲染失败: Parse error on line 2: graph TB actor User as 业务用户 sys ------------------^ Expecting 'SEMI', 'NEWLINE', 'EOF', 'AMP', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'

2.2 类图

NLQuery

+text: str

+parse() : : ParsedQuery

ParsedQuery

+intent: str

+dimensions: list

+metrics: list

+filters: dict

+time_range: tuple

SchemaManager

+tables: dict

+get_table_info()

+get_relationships()

SQLGenerator

+generate_sql(parsed, schema) : : str

+validate_sql(sql) : : bool

QueryExecutor

+execute(sql) : : DataFrame

Visualizer

+auto_chart(df) : : Figure

+get_chart_type()

InsightGenerator

+summarize(df, query) : : str

+detect_anomalies(df) : : list

2.3 时序图(完整请求流程)

InsightGenerator Visualizer Database SQLGenerator SemanticParser FastAPI User InsightGenerator Visualizer Database SQLGenerator SemanticParser FastAPI User POST /query {"question": "上月销售额前10的产品"} parse(question) ParsedQuery(intent=topN, metrics=["sales"], dimensions=["product"], filters={"time":"last_month"}, limit=10) generate_sql(parsed, schema) "SELECT product, SUM(sales) FROM sales WHERE date >= '2025-02-01' AND date <= '2025-02-28' GROUP BY product ORDER BY SUM(sales) DESC LIMIT 10" execute(sql) DataFrame (product, sales) auto_chart(df) Figure (bar chart) summarize(df, query) "上个月销售额最高的产品是A,达120万..." JSON {chart: base64, insight: text, data: [...]}

3. 项目文件结构

ai_data_assistant/
├── backend/                       # 后端服务(FastAPI)
│   ├── app/
│   │   ├── main.py                # 入口
│   │   ├── models/                # Pydantic模型
│   │   │   └── schemas.py
│   │   ├── services/              # 业务逻辑
│   │   │   ├── parser.py          # 语义解析
│   │   │   ├── sql_generator.py   # SQL生成
│   │   │   ├── executor.py        # 数据库执行
│   │   │   ├── visualizer.py      # 可视化
│   │   │   └── insight.py         # 文本结论
│   │   ├── db/                    # 数据库连接
│   │   │   └── connection.py
│   │   ├── utils/                 # 工具函数
│   │   │   ├── schema_loader.py   # 加载数据库schema
│   │   │   └── config.py
│   │   └── prompts/               # LLM提示词模板
│   │       ├── intent_prompt.txt
│   │       └── sql_prompt.txt
│   ├── requirements.txt
│   └── Dockerfile
├── frontend/                      # 前端界面(Streamlit)
│   ├── app.py
│   └── components/
├── data/                          # 示例数据与数据库
│   ├── sales.db                   # SQLite示例
│   └── schema.yaml                # 表结构描述
├── tests/                         # 单元测试
├── docker-compose.yml
├── README.md
└── .env                           # 环境变量(API Key等)

4. 核心模块实现

4.1 语义解析与意图识别

使用LangChain + OpenAI API(或本地模型)实现。定义意图类型:top_ntrendcomparisonfilteraggregation等。

# backend/app/services/parser.py
import json
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any

class ParsedQuery(BaseModel):
    intent: str = Field(description="意图类型: top_n, trend, comparison, filter, aggregation")
    dimensions: List[str] = Field(description="分析维度,如产品、地区、时间等")
    metrics: List[str] = Field(description="指标,如销售额、数量、利润等")
    filters: Dict[str, Any] = Field(description="筛选条件,如 {'date': 'last_month', 'category': '电子产品'}")
    time_range: Optional[tuple] = Field(description="时间范围,如 ('2025-01-01','2025-01-31')")
    limit: Optional[int] = Field(description="返回条数限制")

class SemanticParser:
    def __init__(self, openai_api_key: str):
        self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=openai_api_key)
        self.parser = PydanticOutputParser(pydantic_object=ParsedQuery)
        self.prompt_template = PromptTemplate(
            template="""你是一个数据分析专家。请根据用户问题解析出查询意图、维度、指标、过滤条件等。
用户问题:{question}
{schema_info}
{format_instructions}
请输出JSON。""",
            input_variables=["question", "schema_info"],
            partial_variables={"format_instructions": self.parser.get_format_instructions()}
        )
    
    def parse(self, question: str, schema_info: str) -> ParsedQuery:
        prompt = self.prompt_template.format(question=question, schema_info=schema_info)
        response = self.llm.predict(prompt)
        try:
            return self.parser.parse(response)
        except:
            # 降级处理:返回默认解析
            return ParsedQuery(intent="unknown", dimensions=[], metrics=[], filters={})

4.2 自动生成SQL

根据解析结果和数据库schema,使用LLM生成SQL。

# backend/app/services/sql_generator.py
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.output_parsers import StrOutputParser

class SQLGenerator:
    def __init__(self, openai_api_key: str, schema_manager):
        self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=openai_api_key)
        self.schema_manager = schema_manager
        self.prompt_template = PromptTemplate(
            template="""你是一个SQL专家。根据用户解析的查询信息,生成对应的SQL语句。
数据库schema:
{schema}
解析后的查询:
{parsed}
请只输出SQL语句,不要有其他内容。""",
            input_variables=["schema", "parsed"]
        )
    
    def generate(self, parsed: ParsedQuery) -> str:
        schema_str = self.schema_manager.get_schema_description()
        parsed_str = parsed.json()
        prompt = self.prompt_template.format(schema=schema_str, parsed=parsed_str)
        response = self.llm.predict(prompt)
        # 可选:用SQLAlchemy验证语法
        return response.strip()

4.3 数据库执行与结果获取

# backend/app/services/executor.py
import pandas as pd
from sqlalchemy import create_engine, text

class QueryExecutor:
    def __init__(self, db_url: str):
        self.engine = create_engine(db_url)
    
    def execute(self, sql: str) -> pd.DataFrame:
        with self.engine.connect() as conn:
            result = conn.execute(text(sql))
            # 转为DataFrame
            df = pd.DataFrame(result.fetchall(), columns=result.keys())
        return df

4.4 自动图表生成

根据DataFrame的形状和数据类型自动选择图表类型。

# backend/app/services/visualizer.py
import plotly.express as px
import pandas as pd

class Visualizer:
    def auto_chart(self, df: pd.DataFrame) -> dict:
        """
        返回图表配置(可转为前端ECharts格式),这里返回Plotly JSON
        """
        if df.empty:
            return None
        # 判断是否有时间列
        time_col = None
        numeric_cols = df.select_dtypes(include='number').columns.tolist()
        category_cols = df.select_dtypes(include='object').columns.tolist()
        
        if len(df) == 1:
            # 单值,使用指标卡
            fig = px.bar(df, x=category_cols[0] if category_cols else df.index, y=numeric_cols[0])
        elif len(numeric_cols) == 1 and len(category_cols) >= 1:
            # 单指标多类别 -> 条形图
            fig = px.bar(df, x=category_cols[0], y=numeric_cols[0])
        elif len(numeric_cols) > 1 and len(category_cols) == 1:
            # 多指标对比 -> 分组条形图
            fig = px.bar(df, x=category_cols[0], y=numeric_cols, barmode='group')
        elif any('date' in col.lower() for col in df.columns):
            # 有时间列 -> 折线图
            time_col = [c for c in df.columns if 'date' in c.lower()][0]
            fig = px.line(df, x=time_col, y=numeric_cols)
        else:
            # 默认散点图
            fig = px.scatter(df, x=category_cols[0] if category_cols else df.index, y=numeric_cols[0])
        
        return fig.to_json()

4.5 智能结论生成

使用LLM总结数据发现。

# backend/app/services/insight.py
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate

class InsightGenerator:
    def __init__(self, openai_api_key: str):
        self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7, api_key=openai_api_key)
        self.prompt_template = PromptTemplate(
            template="""根据以下数据分析结果,生成一段简洁的业务洞察:
用户问题:{question}
数据摘要:{data_summary}
请用中文回答,突出关键发现、趋势或异常。""",
            input_variables=["question", "data_summary"]
        )
    
    def summarize(self, df: pd.DataFrame, question: str) -> str:
        # 简单摘要:描述前几行和统计信息
        if df.empty:
            return "未找到相关数据。"
        summary = f"数据共{len(df)}行。\n"
        summary += df.head(3).to_string() + "\n"
        summary += f"统计摘要:\n{df.describe().to_string()}"
        prompt = self.prompt_template.format(question=question, data_summary=summary)
        response = self.llm.predict(prompt)
        return response

4.6 数据库Schema管理

从实际数据库读取表结构,或使用预定义描述。

# backend/app/utils/schema_loader.py
from sqlalchemy import inspect

class SchemaManager:
    def __init__(self, engine):
        self.engine = engine
        self.inspector = inspect(engine)
    
    def get_schema_description(self) -> str:
        """返回人类可读的schema描述"""
        lines = []
        for table_name in self.inspector.get_table_names():
            lines.append(f"表名: {table_name}")
            columns = self.inspector.get_columns(table_name)
            for col in columns:
                lines.append(f"  - {col['name']} ({col['type']})")
            # 可选:添加主外键信息
        return "\n".join(lines)

4.7 FastAPI接口

# backend/app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
from .services.parser import SemanticParser
from .services.sql_generator import SQLGenerator
from .services.executor import QueryExecutor
from .services.visualizer import Visualizer
from .services.insight import InsightGenerator
from .utils.schema_loader import SchemaManager
from .db.connection import get_engine

app = FastAPI()

# 初始化组件
engine = get_engine()
schema_manager = SchemaManager(engine)
parser = SemanticParser(openai_api_key="your-key")
sql_gen = SQLGenerator(openai_api_key="your-key", schema_manager=schema_manager)
executor = QueryExecutor(engine.url)
visualizer = Visualizer()
insight_gen = InsightGenerator(openai_api_key="your-key")

class QueryRequest(BaseModel):
    question: str

class QueryResponse(BaseModel):
    sql: str
    chart: str  # JSON字符串
    insight: str
    data: list  # 原始数据

@app.post("/query", response_model=QueryResponse)
async def process_query(req: QueryRequest):
    try:
        # 1. 解析
        parsed = parser.parse(req.question, schema_manager.get_schema_description())
        # 2. 生成SQL
        sql = sql_gen.generate(parsed)
        # 3. 执行
        df = executor.execute(sql)
        # 4. 可视化
        chart_json = visualizer.auto_chart(df)
        # 5. 智能洞察
        insight = insight_gen.summarize(df, req.question)
        return QueryResponse(
            sql=sql,
            chart=chart_json,
            insight=insight,
            data=df.to_dict(orient='records')
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

5. 前端界面(Streamlit示例)

# frontend/app.py
import streamlit as st
import requests
import json
import plotly.graph_objects as go
import pandas as pd

st.set_page_config(page_title="AI数据分析助手", layout="wide")
st.title("🔍 AI数据分析助手")

with st.form("query_form"):
    question = st.text_area("请输入您的数据分析问题:", placeholder="例如:上月销售额前10的产品有哪些?")
    submitted = st.form_submit_button("分析")

if submitted and question:
    with st.spinner("正在分析..."):
        response = requests.post("http://localhost:8000/query", json={"question": question})
        if response.status_code == 200:
            result = response.json()
            # 显示SQL
            with st.expander("查看生成的SQL"):
                st.code(result["sql"], language="sql")
            # 显示数据表格
            df = pd.DataFrame(result["data"])
            st.dataframe(df)
            # 显示图表
            if result["chart"]:
                fig = go.Figure(json.loads(result["chart"]))
                st.plotly_chart(fig, use_container_width=True)
            # 显示洞察
            st.info(f"📊 洞察:{result['insight']}")
        else:
            st.error(f"请求失败:{response.text}")

6. 运行示例与效果展示

6.1 环境准备

# 克隆项目
git clone https://github.com/yourrepo/ai_data_assistant.git
cd ai_data_assistant

# 创建虚拟环境
python -m venv venv
source venv/bin/activate

# 安装依赖
pip install -r backend/requirements.txt

# 设置OpenAI API Key(可选,也可以使用本地模型)
export OPENAI_API_KEY="your-key"

# 初始化数据库(示例)
python init_db.py  # 创建示例表sales

# 启动后端
cd backend
uvicorn app.main:app --reload --port 8000

# 新终端启动前端
cd frontend
streamlit run app.py

6.2 测试案例

  • 问题1:“上个月销售额最高的10个产品是哪些?”

    • 解析意图:top_n,维度:产品,指标:销售额,时间:上月,limit=10
    • 生成SQL:SELECT product, SUM(amount) as sales FROM sales WHERE date BETWEEN '2025-02-01' AND '2025-02-28' GROUP BY product ORDER BY sales DESC LIMIT 10
    • 结果:条形图展示Top10产品及销售额,文本指出冠军产品。
  • 问题2:“各地区的销售趋势如何?”

    • 解析意图:trend,维度:地区+时间,指标:销售额
    • 生成SQL:按日期和地区分组,返回时间序列。
    • 结果:折线图展示各地区的销售走势,分析增长趋势。
  • 问题3:“电子产品类别的销售额与去年同期相比增长了多少?”

    • 解析意图:comparison,指标:销售额,过滤:类别=电子产品,时间同比
    • 生成SQL:计算今年和去年同期的销售额对比,返回增长率。
    • 结果:卡片展示增长率,伴随结论。

7. 业务应用场景

  1. 商业智能BI:业务人员无需懂SQL,直接用自然语言提问,快速获取销售、用户、运营等数据洞察。
  2. 运营监控:管理者每日通过自然语言询问核心指标,系统自动生成图表和警报。
  3. 数据探索:分析师利用系统快速验证假设,无需编写复杂查询。
  4. 嵌入式分析:集成到CRM、ERP等系统中,提供对话式数据分析能力。

8. 拓展与优化

  • 多轮对话:支持上下文,用户可追问细化结果。
  • 数据权限:根据用户角色过滤可访问的表/列。
  • 缓存机制:对相同问题结果缓存,提高响应速度。
  • 本地模型:使用ChatGLM、Qwen等开源模型替代OpenAI API,降低依赖。
  • 复杂SQL支持:处理多表关联、子查询、窗口函数等,通过提供更详细的schema信息和few-shot示例。

9. 总结

本项目完整实现了“自然语言 → SQL → 图表”的数据分析链路,通过语义解析、意图识别、SQL生成、可视化、智能结论等模块,为用户提供了便捷的数据探索方式。方案基于现代AI技术(LLM)和成熟的数据工具,具有良好的扩展性和实用性,可广泛应用于各类业务场景。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐