AI应用-SQL基础
环境准备(先把数据库跑起来)
bash
# macOS
brew install postgresql@16
brew services start postgresql@16
createdb mylearn
psql mylearn
# 或者用 Docker(更推荐,不污染系统)
docker run -d \
--name pg-learn \
-e POSTGRES_DB=mylearn \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=postgres \
-p 5432:5432 \
postgres:16-alpine
psql postgresql://postgres:postgres@localhost:5432/mylearn
GUI 工具推荐 TablePlus(免费够用),比命令行直观,适合入门阶段。
第一层:增删改查(第 1–3 天)
建表先建好练习数据
用这套表练习,贴近 AI 应用场景:
sql
-- 用户表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(200) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 文章表
CREATE TABLE articles (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
title VARCHAR(300) NOT NULL,
content TEXT,
views INTEGER DEFAULT 0,
published BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 插入练习数据
INSERT INTO users (email, name) VALUES
('alice@example.com', 'Alice'),
('bob@example.com', 'Bob'),
('carol@example.com', 'Carol');
INSERT INTO articles (user_id, title, content, views, published) VALUES
(1, 'Python 入门', 'Python 是一门...', 1200, true),
(1, 'FastAPI 实战', 'FastAPI 是...', 850, true),
(2, 'SQL 基础', 'SQL 是结构化...', 340, true),
(2, '草稿文章', '还没写完...', 0, false),
(3, 'RAG 原理', '检索增强生成...', 2100, true);
SELECT 查询
sql
-- 基础查询
SELECT * FROM users;
SELECT id, name, email FROM users;
-- WHERE 过滤(支持 =、!=、>、<、>=、<=、LIKE、IN、BETWEEN、IS NULL)
SELECT * FROM articles WHERE published = true;
SELECT * FROM articles WHERE views > 500;
SELECT * FROM articles WHERE views BETWEEN 300 AND 1000;
SELECT * FROM articles WHERE title LIKE '%Python%'; -- % 是通配符
SELECT * FROM articles WHERE title ILIKE '%python%'; -- ILIKE 忽略大小写(PostgreSQL 专有)
SELECT * FROM users WHERE email IN ('alice@example.com', 'bob@example.com');
SELECT * FROM articles WHERE content IS NOT NULL;
-- AND / OR 组合条件
SELECT * FROM articles WHERE published = true AND views > 500;
SELECT * FROM articles WHERE views > 1000 OR title LIKE '%入门%';
-- ORDER BY 排序
SELECT * FROM articles ORDER BY views DESC; -- 降序
SELECT * FROM articles ORDER BY created_at ASC; -- 升序(默认)
SELECT * FROM articles ORDER BY published DESC, views DESC; -- 多列排序
-- LIMIT + OFFSET 分页(非常重要,API 分页的基础)
SELECT * FROM articles ORDER BY id LIMIT 10 OFFSET 0; -- 第1页
SELECT * FROM articles ORDER BY id LIMIT 10 OFFSET 10; -- 第2页
SELECT * FROM articles ORDER BY id LIMIT 10 OFFSET 20; -- 第3页
-- DISTINCT 去重
SELECT DISTINCT user_id FROM articles;
-- 列别名(AS)
SELECT
id,
title,
views,
views * 0.001 AS views_k -- 计算列,起别名
FROM articles;
INSERT
sql
-- 插入单行
INSERT INTO users (email, name)
VALUES ('dave@example.com', 'Dave');
-- 插入多行(比多次单行快)
INSERT INTO users (email, name) VALUES
('eve@example.com', 'Eve'),
('frank@example.com', 'Frank');
-- RETURNING:插入后直接返回完整行(非常有用,省去一次 SELECT)
INSERT INTO users (email, name)
VALUES ('grace@example.com', 'Grace')
RETURNING id, email, created_at;
-- ON CONFLICT:处理唯一键冲突(upsert 操作)
INSERT INTO users (email, name)
VALUES ('alice@example.com', 'Alice Updated')
ON CONFLICT (email) DO UPDATE
SET name = EXCLUDED.name; -- EXCLUDED 指要插入的新值
-- ON CONFLICT DO NOTHING:冲突时静默忽略
INSERT INTO users (email, name)
VALUES ('alice@example.com', 'Alice')
ON CONFLICT (email) DO NOTHING;
UPDATE
sql
-- 更新所有行(危险!生产环境一定要加 WHERE)
-- UPDATE articles SET views = 0; ← 不要这样做
-- 更新指定行
UPDATE articles SET views = views + 1 WHERE id = 1;
-- 更新多个字段
UPDATE articles
SET published = true, views = 100
WHERE id = 4;
-- RETURNING:更新后返回最新数据
UPDATE articles
SET views = views + 1
WHERE id = 1
RETURNING id, title, views;
DELETE
sql
-- 删除指定行
DELETE FROM articles WHERE id = 4;
-- RETURNING:删除后返回被删除的数据(用于确认)
DELETE FROM users WHERE email = 'frank@example.com'
RETURNING id, email;
-- 清空整张表(比 DELETE FROM 快,但不能加 WHERE)
TRUNCATE TABLE articles;
数据类型速查
|
类型 |
用途 |
示例 |
|
/ |
自增整数主键 |
|
|
/ |
整数 |
计数、数量 |
|
|
精确小数 |
价格、金额(不能用 FLOAT) |
|
|
有长度限制的字符串 |
邮箱、标题 |
|
|
无限长字符串 |
正文内容、JSON 字符串 |
|
|
布尔值 |
/ |
|
|
带时区的时间戳 |
始终用这个,不用 TIMESTAMP |
|
|
只有日期 |
生日、截止日期 |
|
|
全局唯一 ID |
需要 |
|
|
二进制 JSON |
存元数据、配置 |
第二层:表设计(第 4–6 天)
表设计与约束
sql
-- 完整的表定义示例
CREATE TABLE sessions (
-- 主键:唯一标识每一行
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- 外键:关联其他表(ON DELETE CASCADE = 用户删了,会话也删)
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
-- NOT NULL:这个字段不能为空
title VARCHAR(200) NOT NULL DEFAULT '新对话',
-- CHECK:值必须满足的条件
status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (status IN ('active', 'archived', 'deleted')),
-- DEFAULT:插入时不传这个字段就用默认值
message_count INTEGER NOT NULL DEFAULT 0,
-- 时间字段始终用 TIMESTAMPTZ
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 多列唯一约束(组合唯一)
CREATE TABLE user_roles (
user_id INTEGER REFERENCES users(id),
role VARCHAR(50) NOT NULL,
PRIMARY KEY (user_id, role) -- 同一用户同一角色只能有一条
);
三个 ON DELETE 选项(外键删除时的行为):
sql
-- CASCADE:父行删了,子行跟着删
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE
-- SET NULL:父行删了,这个字段变成 NULL
user_id INTEGER REFERENCES users(id) ON DELETE SET NULL
-- RESTRICT(默认):父行有子行时,禁止删除父行
user_id INTEGER REFERENCES users(id) ON DELETE RESTRICT
JOIN 关联查询
sql
-- INNER JOIN:只返回两张表都有匹配的行
-- 场景:查文章同时显示作者名
SELECT
a.id,
a.title,
a.views,
u.name AS author_name,
u.email AS author_email
FROM articles a
INNER JOIN users u ON a.user_id = u.id
WHERE a.published = true
ORDER BY a.views DESC;
-- LEFT JOIN:左表全返回,右表没匹配就填 NULL
-- 场景:查所有用户,包括没发过文章的
SELECT
u.name,
COUNT(a.id) AS article_count -- 没文章的用户这里是 0
FROM users u
LEFT JOIN articles a ON a.user_id = u.id AND a.published = true
GROUP BY u.id, u.name
ORDER BY article_count DESC;
-- 三表关联
SELECT
m.content,
m.created_at,
s.title AS session_title,
u.name AS user_name
FROM messages m
JOIN sessions s ON m.session_id = s.id
JOIN users u ON s.user_id = u.id
WHERE u.id = 1
ORDER BY m.created_at;
记住这个口诀:INNER JOIN 取交集,LEFT JOIN 保左表,RIGHT JOIN 保右表(实际很少用,LEFT JOIN 换表顺序等价)。
聚合函数 + GROUP BY
sql
-- 基础聚合
SELECT
COUNT(*) AS total_articles,
COUNT(DISTINCT user_id) AS author_count,
SUM(views) AS total_views,
AVG(views) AS avg_views,
MAX(views) AS max_views,
MIN(views) AS min_views
FROM articles
WHERE published = true;
-- GROUP BY:按某列分组,再聚合
-- 场景:每个用户发了多少篇文章,总阅读量多少
SELECT
u.name,
COUNT(a.id) AS article_count,
SUM(a.views) AS total_views,
ROUND(AVG(a.views)) AS avg_views
FROM users u
LEFT JOIN articles a ON a.user_id = u.id
GROUP BY u.id, u.name -- GROUP BY 里必须包含所有非聚合列
ORDER BY total_views DESC;
-- HAVING:对聚合结果过滤(WHERE 过滤原始行,HAVING 过滤聚合后的结果)
SELECT
user_id,
COUNT(*) AS article_count
FROM articles
WHERE published = true -- 先过滤:只看已发布的
GROUP BY user_id
HAVING COUNT(*) >= 2 -- 再过滤:只看发了2篇以上的
ORDER BY article_count DESC;
-- 执行顺序(很重要):
-- FROM → WHERE → GROUP BY → HAVING → SELECT → ORDER BY → LIMIT
第三层:生产级查询(第 7–10 天)
索引
索引让查询从"全表扫描"变成"直接定位",数据量大时差距是数量级的。
sql
-- 自动建索引的情况:PRIMARY KEY 和 UNIQUE 约束自动建索引
-- 需要手动建索引的情况:经常出现在 WHERE、JOIN ON、ORDER BY 里的字段
-- 普通索引
CREATE INDEX idx_articles_user_id ON articles(user_id);
CREATE INDEX idx_articles_created_at ON articles(created_at DESC);
-- 复合索引(多列一起查时用)
CREATE INDEX idx_articles_user_published ON articles(user_id, published);
-- 条件索引(只索引部分行,减小索引体积)
CREATE INDEX idx_articles_published_views
ON articles(views DESC)
WHERE published = true; -- 只索引已发布的文章
-- 查看某张表的所有索引
SELECT indexname, indexdef FROM pg_indexes WHERE tablename = 'articles';
-- 删除索引
DROP INDEX idx_articles_user_id;
什么时候建索引:
sql
-- 用 EXPLAIN ANALYZE 查看查询执行计划
EXPLAIN ANALYZE
SELECT * FROM articles WHERE user_id = 1 AND published = true;
-- 输出里看这两个关键信息:
-- Seq Scan(全表扫描)→ 说明没有索引,数据量大时很慢
-- Index Scan(索引扫描)→ 用上了索引,很快
-- actual time=0.xxx..0.xxx rows=N → 实际耗时和返回行数
建索引的判断标准:字段经常出现在 WHERE、JOIN ON、ORDER BY 里,且表的数据量超过几千行,就值得建索引。不要给所有字段都建索引——索引会占空间,且让写操作变慢。
事务
事务保证"要么全成功,要么全失败"——转账时扣款和入账必须同时成功,不能只做一半:
sql
-- 手动控制事务
BEGIN;
UPDATE users SET name = '张三' WHERE id = 1;
INSERT INTO articles (user_id, title) VALUES (1, '新文章');
-- 如果到这里一切正常:
COMMIT;
-- 如果中途发现有问题:
-- ROLLBACK; ← 撤销所有修改,回到 BEGIN 之前的状态
在 asyncpg 里用事务:
python
async with db.transaction(): # 自动 BEGIN
await db.execute("UPDATE ...") # 如果这里抛出异常
await db.execute("INSERT ...") # 会自动 ROLLBACK
# 成功则自动 COMMIT
PostgreSQL 特有功能
gen_random_uuid()——生成 UUID 主键(比 SERIAL 更安全,不暴露数量):
sql
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
filename TEXT NOT NULL
);
INSERT INTO documents (filename) VALUES ('manual.pdf')
RETURNING id;
-- 返回:id = '550e8400-e29b-41d4-a716-446655440000'
JSONB——存储半结构化数据(文档元数据、配置信息):
sql
ALTER TABLE documents ADD COLUMN metadata JSONB DEFAULT '{}';
-- 插入 JSON
UPDATE documents
SET metadata = '{"page_count": 42, "language": "zh", "tags": ["技术", "AI"]}'
WHERE id = '...';
-- 查询 JSON 字段(->> 取值为文本,-> 取值为 JSON)
SELECT filename, metadata->>'language' AS lang
FROM documents
WHERE metadata->>'language' = 'zh';
-- 查询 JSON 数组
SELECT filename
FROM documents
WHERE metadata->'tags' ? '技术'; -- ? 检查 key 或数组元素是否存在
窗口函数——在保留所有行的同时做聚合计算(比 GROUP BY 强大):
sql
-- 每篇文章 + 该作者的文章总数(GROUP BY 做不到同时保留每行)
SELECT
title,
views,
user_id,
COUNT(*) OVER (PARTITION BY user_id) AS author_total_articles,
RANK() OVER (PARTITION BY user_id ORDER BY views DESC) AS rank_in_author
FROM articles
WHERE published = true;
-- ROW_NUMBER:给每行编号(分页、去重时很有用)
SELECT
ROW_NUMBER() OVER (ORDER BY views DESC) AS rank,
title,
views
FROM articles
WHERE published = true;
第四层:AI 应用完整表结构
把前面所有知识点整合成一套真实可用的 AI 聊天应用数据库:
sql
-- ── 启用扩展 ──────────────────────────────────────────────────
CREATE EXTENSION IF NOT EXISTS "pgcrypto"; -- gen_random_uuid()
CREATE EXTENSION IF NOT EXISTS "vector"; -- pgvector(RAG 阶段用)
-- ── 用户表 ────────────────────────────────────────────────────
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(200) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
password_hash VARCHAR(200) NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users(email);
-- ── 会话表 ────────────────────────────────────────────────────
CREATE TABLE sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(200) NOT NULL DEFAULT '新对话',
-- 最近一条消息的快照,避免每次都 JOIN messages 表
last_message TEXT,
message_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_sessions_user_id ON sessions(user_id);
CREATE INDEX idx_sessions_updated_at ON sessions(updated_at DESC);
-- ── 消息表 ────────────────────────────────────────────────────
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL CHECK (role IN ('user', 'assistant', 'system', 'tool')),
content TEXT NOT NULL,
-- 存储引用来源(RAG 场景)
sources JSONB,
-- token 消耗统计
tokens_input INTEGER,
tokens_output INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_messages_session_id ON messages(session_id);
CREATE INDEX idx_messages_created_at ON messages(created_at);
-- ── 文档表(RAG 知识库)──────────────────────────────────────
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
filename VARCHAR(500) NOT NULL,
file_size INTEGER,
doc_type VARCHAR(20) NOT NULL CHECK (doc_type IN ('pdf', 'docx', 'markdown', 'txt')),
status VARCHAR(20) NOT NULL DEFAULT 'processing'
CHECK (status IN ('processing', 'ready', 'error')),
chunk_count INTEGER,
-- 存储文件路径、页数等元数据
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_documents_user_id ON documents(user_id);
CREATE INDEX idx_documents_status ON documents(status);
-- ── 文档块表(向量检索)──────────────────────────────────────
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
-- 向量(text-embedding-3-small 是 1536 维)
embedding VECTOR(1536),
-- 页码、章节等元数据
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 向量相似搜索索引(HNSW 算法)
CREATE INDEX idx_chunks_embedding
ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
CREATE INDEX idx_chunks_document_id ON document_chunks(document_id);
-- ── 常用查询示例 ──────────────────────────────────────────────
-- 分页查询某用户的会话列表(最近的在前)
SELECT
id, title, message_count, last_message, updated_at
FROM sessions
WHERE user_id = 1
ORDER BY updated_at DESC
LIMIT 20 OFFSET 0;
-- 查询某会话的完整消息历史
SELECT
id, role, content, sources, created_at
FROM messages
WHERE session_id = 'xxx'
ORDER BY created_at ASC;
-- 统计某用户的使用情况
SELECT
COUNT(DISTINCT s.id) AS session_count,
COUNT(m.id) AS message_count,
SUM(m.tokens_input) AS total_input_tokens,
SUM(m.tokens_output) AS total_output_tokens,
MIN(s.created_at) AS first_session
FROM sessions s
LEFT JOIN messages m ON m.session_id = s.id
WHERE s.user_id = 1;
-- 向量相似搜索(RAG 检索)
SELECT
dc.id,
dc.content,
dc.metadata,
d.filename,
1 - (dc.embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM document_chunks dc
JOIN documents d ON dc.document_id = d.id
WHERE d.user_id = 1
ORDER BY dc.embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 5;
10 天学习安排
|
天 |
内容 |
当天产出 |
|
第 1 天 |
安装 PostgreSQL,学 SELECT + WHERE + ORDER BY |
查出所有已发布文章并排序 |
|
第 2 天 |
INSERT + UPDATE + DELETE + RETURNING |
写 CRUD 完整操作 |
|
第 3 天 |
数据类型 + LIMIT/OFFSET 分页 |
实现分页查询 |
|
第 4 天 |
CREATE TABLE + 约束(PK/FK/NOT NULL/CHECK) |
设计一张完整的表 |
|
第 5 天 |
INNER JOIN + LEFT JOIN |
写出文章+作者联查 |
|
第 6 天 |
GROUP BY + COUNT/SUM/AVG + HAVING |
统计每个用户的数据 |
|
第 7 天 |
索引 + EXPLAIN ANALYZE |
给慢查询加索引 |
|
第 8 天 |
事务 + asyncpg 集成 |
在 FastAPI 里执行 SQL |
|
第 9 天 |
JSONB + UUID + gen_random_uuid |
把 SERIAL 改成 UUID 主键 |
|
第 10 天 |
设计完整 AI 应用数据库 |
建出上面那套完整的表结构 |
最容易踩的坑
用 FLOAT 存金额——浮点数有精度问题,0.1 + 0.2 ≠ 0.3。存价格、金额必须用 NUMERIC(10, 2)。
用 TIMESTAMP 不用 TIMESTAMPTZ——不带时区的时间戳在跨时区部署时会出错。始终用 TIMESTAMPTZ,存的是 UTC,显示时再转本地时区。
WHERE 在 HAVING 之前——WHERE 过滤原始行,HAVING 过滤聚合后的结果,顺序不能颠倒:
sql
-- 错误:COUNT 不能出现在 WHERE 里
SELECT user_id, COUNT(*) FROM articles WHERE COUNT(*) > 2 GROUP BY user_id;
-- 正确
SELECT user_id, COUNT(*) FROM articles GROUP BY user_id HAVING COUNT(*) > 2;
忘加 WHERE 就 UPDATE/DELETE——养成习惯:先写 SELECT * FROM table WHERE 条件 确认影响的行,再改成 UPDATE 或 DELETE。
JOIN 后忘了 GROUP BY 包含所有非聚合列——SELECT u.id, u.name, COUNT(a.id) 里 u.id 和 u.name 都不是聚合函数,必须都写进 GROUP BY。
N+1 查询——循环里对每条记录再查一次数据库,100 条数据 = 101 次查询。用 JOIN 或 IN 一次查完:
python
# 错误:N+1,100个会话 = 101次查询
sessions = await db.fetch("SELECT * FROM sessions WHERE user_id=$1", uid)
for s in sessions:
messages = await db.fetch("SELECT * FROM messages WHERE session_id=$1", s['id'])
# 正确:1次查询
messages = await db.fetch("""
SELECT m.* FROM messages m
JOIN sessions s ON m.session_id = s.id
WHERE s.user_id = $1
""", uid)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)