AI + 数据分析助手
·
AI + 数据分析助手:自然语言驱动的智能数据分析平台
1. 项目概述
本项目构建一个智能数据分析助手,允许用户通过自然语言提问,系统自动解析意图、生成SQL查询、执行数据库查询,并以图表或文本形式返回分析结果。核心目标是将复杂的SQL编写工作自动化,使业务人员能快速获取数据洞察。
核心流程:用户问题 → 语义解析(意图+实体识别) → SQL生成 → 数据库执行 → 结果可视化/文本总结 → 返回用户。
技术栈:
- 后端:Python + FastAPI + SQLAlchemy
- 前端:Streamlit(演示) / React(生产)
- 自然语言处理:LangChain + OpenAI API(或本地部署的模型如ChatGLM)
- 数据库:MySQL / PostgreSQL / SQLite(示例)
- 图表:Plotly / Matplotlib / ECharts
2. UML建模
2.1 用例图
渲染错误: Mermaid 渲染失败: Parse error on line 2: graph TB actor User as 业务用户 sys ------------------^ Expecting 'SEMI', 'NEWLINE', 'EOF', 'AMP', 'START_LINK', 'LINK', 'LINK_ID', got 'NODE_STRING'
2.2 类图
2.3 时序图(完整请求流程)
3. 项目文件结构
ai_data_assistant/
├── backend/ # 后端服务(FastAPI)
│ ├── app/
│ │ ├── main.py # 入口
│ │ ├── models/ # Pydantic模型
│ │ │ └── schemas.py
│ │ ├── services/ # 业务逻辑
│ │ │ ├── parser.py # 语义解析
│ │ │ ├── sql_generator.py # SQL生成
│ │ │ ├── executor.py # 数据库执行
│ │ │ ├── visualizer.py # 可视化
│ │ │ └── insight.py # 文本结论
│ │ ├── db/ # 数据库连接
│ │ │ └── connection.py
│ │ ├── utils/ # 工具函数
│ │ │ ├── schema_loader.py # 加载数据库schema
│ │ │ └── config.py
│ │ └── prompts/ # LLM提示词模板
│ │ ├── intent_prompt.txt
│ │ └── sql_prompt.txt
│ ├── requirements.txt
│ └── Dockerfile
├── frontend/ # 前端界面(Streamlit)
│ ├── app.py
│ └── components/
├── data/ # 示例数据与数据库
│ ├── sales.db # SQLite示例
│ └── schema.yaml # 表结构描述
├── tests/ # 单元测试
├── docker-compose.yml
├── README.md
└── .env # 环境变量(API Key等)
4. 核心模块实现
4.1 语义解析与意图识别
使用LangChain + OpenAI API(或本地模型)实现。定义意图类型:top_n、trend、comparison、filter、aggregation等。
# backend/app/services/parser.py
import json
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
class ParsedQuery(BaseModel):
intent: str = Field(description="意图类型: top_n, trend, comparison, filter, aggregation")
dimensions: List[str] = Field(description="分析维度,如产品、地区、时间等")
metrics: List[str] = Field(description="指标,如销售额、数量、利润等")
filters: Dict[str, Any] = Field(description="筛选条件,如 {'date': 'last_month', 'category': '电子产品'}")
time_range: Optional[tuple] = Field(description="时间范围,如 ('2025-01-01','2025-01-31')")
limit: Optional[int] = Field(description="返回条数限制")
class SemanticParser:
def __init__(self, openai_api_key: str):
self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=openai_api_key)
self.parser = PydanticOutputParser(pydantic_object=ParsedQuery)
self.prompt_template = PromptTemplate(
template="""你是一个数据分析专家。请根据用户问题解析出查询意图、维度、指标、过滤条件等。
用户问题:{question}
{schema_info}
{format_instructions}
请输出JSON。""",
input_variables=["question", "schema_info"],
partial_variables={"format_instructions": self.parser.get_format_instructions()}
)
def parse(self, question: str, schema_info: str) -> ParsedQuery:
prompt = self.prompt_template.format(question=question, schema_info=schema_info)
response = self.llm.predict(prompt)
try:
return self.parser.parse(response)
except:
# 降级处理:返回默认解析
return ParsedQuery(intent="unknown", dimensions=[], metrics=[], filters={})
4.2 自动生成SQL
根据解析结果和数据库schema,使用LLM生成SQL。
# backend/app/services/sql_generator.py
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.output_parsers import StrOutputParser
class SQLGenerator:
def __init__(self, openai_api_key: str, schema_manager):
self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key=openai_api_key)
self.schema_manager = schema_manager
self.prompt_template = PromptTemplate(
template="""你是一个SQL专家。根据用户解析的查询信息,生成对应的SQL语句。
数据库schema:
{schema}
解析后的查询:
{parsed}
请只输出SQL语句,不要有其他内容。""",
input_variables=["schema", "parsed"]
)
def generate(self, parsed: ParsedQuery) -> str:
schema_str = self.schema_manager.get_schema_description()
parsed_str = parsed.json()
prompt = self.prompt_template.format(schema=schema_str, parsed=parsed_str)
response = self.llm.predict(prompt)
# 可选:用SQLAlchemy验证语法
return response.strip()
4.3 数据库执行与结果获取
# backend/app/services/executor.py
import pandas as pd
from sqlalchemy import create_engine, text
class QueryExecutor:
def __init__(self, db_url: str):
self.engine = create_engine(db_url)
def execute(self, sql: str) -> pd.DataFrame:
with self.engine.connect() as conn:
result = conn.execute(text(sql))
# 转为DataFrame
df = pd.DataFrame(result.fetchall(), columns=result.keys())
return df
4.4 自动图表生成
根据DataFrame的形状和数据类型自动选择图表类型。
# backend/app/services/visualizer.py
import plotly.express as px
import pandas as pd
class Visualizer:
def auto_chart(self, df: pd.DataFrame) -> dict:
"""
返回图表配置(可转为前端ECharts格式),这里返回Plotly JSON
"""
if df.empty:
return None
# 判断是否有时间列
time_col = None
numeric_cols = df.select_dtypes(include='number').columns.tolist()
category_cols = df.select_dtypes(include='object').columns.tolist()
if len(df) == 1:
# 单值,使用指标卡
fig = px.bar(df, x=category_cols[0] if category_cols else df.index, y=numeric_cols[0])
elif len(numeric_cols) == 1 and len(category_cols) >= 1:
# 单指标多类别 -> 条形图
fig = px.bar(df, x=category_cols[0], y=numeric_cols[0])
elif len(numeric_cols) > 1 and len(category_cols) == 1:
# 多指标对比 -> 分组条形图
fig = px.bar(df, x=category_cols[0], y=numeric_cols, barmode='group')
elif any('date' in col.lower() for col in df.columns):
# 有时间列 -> 折线图
time_col = [c for c in df.columns if 'date' in c.lower()][0]
fig = px.line(df, x=time_col, y=numeric_cols)
else:
# 默认散点图
fig = px.scatter(df, x=category_cols[0] if category_cols else df.index, y=numeric_cols[0])
return fig.to_json()
4.5 智能结论生成
使用LLM总结数据发现。
# backend/app/services/insight.py
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
class InsightGenerator:
def __init__(self, openai_api_key: str):
self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7, api_key=openai_api_key)
self.prompt_template = PromptTemplate(
template="""根据以下数据分析结果,生成一段简洁的业务洞察:
用户问题:{question}
数据摘要:{data_summary}
请用中文回答,突出关键发现、趋势或异常。""",
input_variables=["question", "data_summary"]
)
def summarize(self, df: pd.DataFrame, question: str) -> str:
# 简单摘要:描述前几行和统计信息
if df.empty:
return "未找到相关数据。"
summary = f"数据共{len(df)}行。\n"
summary += df.head(3).to_string() + "\n"
summary += f"统计摘要:\n{df.describe().to_string()}"
prompt = self.prompt_template.format(question=question, data_summary=summary)
response = self.llm.predict(prompt)
return response
4.6 数据库Schema管理
从实际数据库读取表结构,或使用预定义描述。
# backend/app/utils/schema_loader.py
from sqlalchemy import inspect
class SchemaManager:
def __init__(self, engine):
self.engine = engine
self.inspector = inspect(engine)
def get_schema_description(self) -> str:
"""返回人类可读的schema描述"""
lines = []
for table_name in self.inspector.get_table_names():
lines.append(f"表名: {table_name}")
columns = self.inspector.get_columns(table_name)
for col in columns:
lines.append(f" - {col['name']} ({col['type']})")
# 可选:添加主外键信息
return "\n".join(lines)
4.7 FastAPI接口
# backend/app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
from .services.parser import SemanticParser
from .services.sql_generator import SQLGenerator
from .services.executor import QueryExecutor
from .services.visualizer import Visualizer
from .services.insight import InsightGenerator
from .utils.schema_loader import SchemaManager
from .db.connection import get_engine
app = FastAPI()
# 初始化组件
engine = get_engine()
schema_manager = SchemaManager(engine)
parser = SemanticParser(openai_api_key="your-key")
sql_gen = SQLGenerator(openai_api_key="your-key", schema_manager=schema_manager)
executor = QueryExecutor(engine.url)
visualizer = Visualizer()
insight_gen = InsightGenerator(openai_api_key="your-key")
class QueryRequest(BaseModel):
question: str
class QueryResponse(BaseModel):
sql: str
chart: str # JSON字符串
insight: str
data: list # 原始数据
@app.post("/query", response_model=QueryResponse)
async def process_query(req: QueryRequest):
try:
# 1. 解析
parsed = parser.parse(req.question, schema_manager.get_schema_description())
# 2. 生成SQL
sql = sql_gen.generate(parsed)
# 3. 执行
df = executor.execute(sql)
# 4. 可视化
chart_json = visualizer.auto_chart(df)
# 5. 智能洞察
insight = insight_gen.summarize(df, req.question)
return QueryResponse(
sql=sql,
chart=chart_json,
insight=insight,
data=df.to_dict(orient='records')
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
5. 前端界面(Streamlit示例)
# frontend/app.py
import streamlit as st
import requests
import json
import plotly.graph_objects as go
import pandas as pd
st.set_page_config(page_title="AI数据分析助手", layout="wide")
st.title("🔍 AI数据分析助手")
with st.form("query_form"):
question = st.text_area("请输入您的数据分析问题:", placeholder="例如:上月销售额前10的产品有哪些?")
submitted = st.form_submit_button("分析")
if submitted and question:
with st.spinner("正在分析..."):
response = requests.post("http://localhost:8000/query", json={"question": question})
if response.status_code == 200:
result = response.json()
# 显示SQL
with st.expander("查看生成的SQL"):
st.code(result["sql"], language="sql")
# 显示数据表格
df = pd.DataFrame(result["data"])
st.dataframe(df)
# 显示图表
if result["chart"]:
fig = go.Figure(json.loads(result["chart"]))
st.plotly_chart(fig, use_container_width=True)
# 显示洞察
st.info(f"📊 洞察:{result['insight']}")
else:
st.error(f"请求失败:{response.text}")
6. 运行示例与效果展示
6.1 环境准备
# 克隆项目
git clone https://github.com/yourrepo/ai_data_assistant.git
cd ai_data_assistant
# 创建虚拟环境
python -m venv venv
source venv/bin/activate
# 安装依赖
pip install -r backend/requirements.txt
# 设置OpenAI API Key(可选,也可以使用本地模型)
export OPENAI_API_KEY="your-key"
# 初始化数据库(示例)
python init_db.py # 创建示例表sales
# 启动后端
cd backend
uvicorn app.main:app --reload --port 8000
# 新终端启动前端
cd frontend
streamlit run app.py
6.2 测试案例
-
问题1:“上个月销售额最高的10个产品是哪些?”
- 解析意图:top_n,维度:产品,指标:销售额,时间:上月,limit=10
- 生成SQL:
SELECT product, SUM(amount) as sales FROM sales WHERE date BETWEEN '2025-02-01' AND '2025-02-28' GROUP BY product ORDER BY sales DESC LIMIT 10 - 结果:条形图展示Top10产品及销售额,文本指出冠军产品。
-
问题2:“各地区的销售趋势如何?”
- 解析意图:trend,维度:地区+时间,指标:销售额
- 生成SQL:按日期和地区分组,返回时间序列。
- 结果:折线图展示各地区的销售走势,分析增长趋势。
-
问题3:“电子产品类别的销售额与去年同期相比增长了多少?”
- 解析意图:comparison,指标:销售额,过滤:类别=电子产品,时间同比
- 生成SQL:计算今年和去年同期的销售额对比,返回增长率。
- 结果:卡片展示增长率,伴随结论。
7. 业务应用场景
- 商业智能BI:业务人员无需懂SQL,直接用自然语言提问,快速获取销售、用户、运营等数据洞察。
- 运营监控:管理者每日通过自然语言询问核心指标,系统自动生成图表和警报。
- 数据探索:分析师利用系统快速验证假设,无需编写复杂查询。
- 嵌入式分析:集成到CRM、ERP等系统中,提供对话式数据分析能力。
8. 拓展与优化
- 多轮对话:支持上下文,用户可追问细化结果。
- 数据权限:根据用户角色过滤可访问的表/列。
- 缓存机制:对相同问题结果缓存,提高响应速度。
- 本地模型:使用ChatGLM、Qwen等开源模型替代OpenAI API,降低依赖。
- 复杂SQL支持:处理多表关联、子查询、窗口函数等,通过提供更详细的schema信息和few-shot示例。
9. 总结
本项目完整实现了“自然语言 → SQL → 图表”的数据分析链路,通过语义解析、意图识别、SQL生成、可视化、智能结论等模块,为用户提供了便捷的数据探索方式。方案基于现代AI技术(LLM)和成熟的数据工具,具有良好的扩展性和实用性,可广泛应用于各类业务场景。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)