山东大学创新实训9——从 SQLite 到 PostgreSQL
一、现存问题

StoryEcho 是一个 AI 驱动的文字冒险游戏后端,最初使用 SQLite 作为数据库。随着请求增长,系统开始频繁出现数据库锁定错误(sqlite3.OperationalError: database is locked)。分析发现,SQLite 在写入时会锁定整个数据库文件,当多个请求同时执行 `update_character_session` 和 `record_item_discovery` 操作时就会产生冲突,而默认等待时间较短,高并发下很容易超时。因此,我们决定将数据库从 SQLite 迁移到 PostgreSQL。
二、技术选型
我和ai进行了交互,看什么数据库更加合适:


选择 PostgreSQL 主要基于三点考虑:PostgreSQL 采用 MVCC(多版本并发控制)机制,读写互不阻塞,能够支持数千并发连接;原生支持 JSONB 类型,可以高效存储游戏中的动态属性数据;同时具备完整的事务隔离级别,确保数据一致性。相比之下,SQLite 的写锁机制在高并发场景下成为明显瓶颈。
三、架构设计
迁移策略采用统一数据库接口层设计。应用层(FastAPI)通过 `get_db_connection()` 函数获取数据库连接,该函数根据环境变量 `USE_POSTGRES` 自动选择使用 PostgreSQL 的 asyncpg 连接池或 SQLite 连接。这样既能在生产环境使用 PostgreSQL 处理高并发,又能保留 SQLite 用于开发测试。
配置管理方面,新增了 `db_config.py` 文件统一管理 PostgreSQL 连接参数,包括主机、端口、数据库名、用户名和密码,所有这些配置都通过环境变量读取,确保敏感信息不入库。
我根据了下面这个博客进行了安装和配置环境:
postgreSQL安装教程及使用方法(保姆级)-CSDN博客
在cmd中创建数据库:
psql -U postgres -h localhost -c "CREATE DATABASE storyecho;"
psql -U postgres -h localhost -c "CREATE USER storyecho_user WITH PASSWORD 'your_password_here';"
psql -U postgres -h localhost -c "GRANT ALL PRIVILEGES ON DATABASE storyecho TO storyecho_user;"
在.env文件中进行配置:
# 数据库配置
USE_POSTGRES=true
DB_HOST=localhost
DB_PORT=5432
DB_NAME=storyecho
DB_USER=storyecho_user
DB_PASSWORD=your_password_here
#使用 SQLite:.env 中设置 USE_POSTGRES=false
#使用 PostgreSQL:.env 中设置 USE_POSTGRES=true
四、核心代码修改
数据库连接池的初始化是关键改造。在 `database.py` 中,我们实现了异步连接池:`init_db_pool()` 函数创建 asyncpg 连接池,设置最小 5 个、最大 20 个连接,并配置了 60 秒命令超时。
async def init_db_pool():
"""初始化数据库连接池"""
global _pool, _pool_initialized
if _pool_initialized and _pool is not None and not _pool._closed:
return _pool
if USE_POSTGRES:
# 关闭旧连接池
if _pool is not None and not _pool._closed:
await _pool.close()
_pool = await asyncpg.create_pool(
host=POSTGRES_CONFIG['host'],
port=POSTGRES_CONFIG['port'],
database=POSTGRES_CONFIG['database'],
user=POSTGRES_CONFIG['user'],
password=POSTGRES_CONFIG['password'],
min_size=5,
max_size=20,
command_timeout=60,
max_queries=50000,
max_inactive_connection_lifetime=300
)
_pool_initialized = True
print(f"[DB] PostgreSQL 连接池已创建 -> {POSTGRES_CONFIG['database']}")
else:
# SQLite 模式(备用)
print(f"[DB] 使用 SQLite 模式")
_pool_initialized = True
return _pool
`get_db_connection()` 返回连接池对象供各处使用,应用关闭时通过 `close_db_pool()` 释放资源。连接池机制有效解决了 SQLite 时代每次请求都打开关闭数据库连接的性能开销。
表结构定义也进行了适配。由于 asyncpg 不支持 `INSERT OR IGNORE` 语法,全部改为标准的 `INSERT ... ON CONFLICT ... DO NOTHING/UPDATE` 模式。JSON 字段从 SQLite 的 TEXT 类型改为 PostgreSQL 原生的 JSONB 类型,支持更高效的查询和索引。我们创建了专门的 `database_pg.py` 来集中管理所有表的 DDL 语句,包括用户表、角色表、会话表、存档表、成就表、拓扑节点表等二十余张表。
# backend/database_pg.py
# -*- coding: utf-8 -*-
import asyncpg
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
from .db_config import POSTGRES_CONFIG, DATABASE_URL
class PostgresDatabase:
def __init__(self):
self.pool = None
async def connect(self):
"""创建连接池"""
self.pool = await asyncpg.create_pool(
host=POSTGRES_CONFIG['host'],
port=POSTGRES_CONFIG['port'],
database=POSTGRES_CONFIG['database'],
user=POSTGRES_CONFIG['user'],
password=POSTGRES_CONFIG['password'],
min_size=5,
max_size=20,
command_timeout=60
)
await self._init_tables()
print(f"[DB] PostgreSQL 连接池已创建 -> {POSTGRES_CONFIG['database']}")
async def _init_tables(self):
"""创建所有表"""
queries = [
# 用户表
"""
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
email TEXT UNIQUE,
username TEXT UNIQUE,
password TEXT,
verify_code TEXT,
is_verified INTEGER DEFAULT 0,
created_at TIMESTAMP
)
""",
# 故事表
"""
CREATE TABLE IF NOT EXISTS stories (
story_id TEXT PRIMARY KEY,
title TEXT NOT NULL,
icon TEXT,
genre TEXT,
difficulty TEXT,
background TEXT,
description TEXT,
created_at TIMESTAMP
)
""",
# 角色表
"""
CREATE TABLE IF NOT EXISTS characters (
character_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
story_id TEXT NOT NULL,
name TEXT NOT NULL,
gender TEXT,
personality INTEGER DEFAULT 50,
base_attributes JSONB,
initial_inventory_json JSONB,
selected_talent_id TEXT,
selected_background_id TEXT,
ngp_level INTEGER DEFAULT 0,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
""",
# 角色会话表
"""
CREATE TABLE IF NOT EXISTS character_sessions (
session_id TEXT PRIMARY KEY,
character_id TEXT NOT NULL,
user_id TEXT NOT NULL,
story_id TEXT NOT NULL,
current_attributes JSONB,
inventory_json JSONB,
relationships_json JSONB,
task_progress_json JSONB,
session_start_at TIMESTAMP,
last_updated_at TIMESTAMP,
status TEXT DEFAULT 'ongoing'
)
""",
# 存档表
"""
CREATE TABLE IF NOT EXISTS game_saves (
save_id TEXT PRIMARY KEY,
user_id TEXT,
story_id TEXT,
node_id TEXT,
game_state JSONB,
created_at TIMESTAMP,
playthrough INTEGER,
is_autosave BOOLEAN DEFAULT FALSE,
save_name TEXT,
last_loaded TIMESTAMP
)
""",
# 故事成就表 - 添加 category 字段
"""
CREATE TABLE IF NOT EXISTS story_achievements (
story_achievement_id TEXT PRIMARY KEY,
story_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
icon TEXT,
category TEXT,
condition_json JSONB,
sort_order INTEGER DEFAULT 0
)
""",
# 用户成就表
"""
CREATE TABLE IF NOT EXISTS user_story_achievements (
user_story_achievement_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
story_id TEXT NOT NULL,
story_achievement_id TEXT NOT NULL,
character_id TEXT,
unlocked_at TIMESTAMP
)
""",
# 成就表(通用)
"""
CREATE TABLE IF NOT EXISTS achievements (
achievement_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
category TEXT,
unlock_condition TEXT
)
""",
# 用户成就表(通用)
"""
CREATE TABLE IF NOT EXISTS user_achievements (
user_achievement_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
achievement_id TEXT NOT NULL,
unlocked_at TIMESTAMP
)
""",
# ============ 拓扑相关表 ============
# 拓扑节点表(模板)
"""
CREATE TABLE IF NOT EXISTS topology_nodes (
id TEXT,
story_id TEXT,
name TEXT,
type TEXT,
description TEXT,
x REAL,
y REAL,
conditions JSONB,
connected_to JSONB,
icon TEXT,
PRIMARY KEY (id, story_id)
)
""",
# 用户解锁节点表
"""
CREATE TABLE IF NOT EXISTS user_unlocked_nodes (
id SERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
story_id TEXT NOT NULL,
node_id TEXT NOT NULL,
unlocked_at TIMESTAMP,
playthrough INTEGER,
UNIQUE(user_id, story_id, node_id, playthrough)
)
""",
# 探索路径表
"""
CREATE TABLE IF NOT EXISTS exploration_path (
id SERIAL PRIMARY KEY,
user_id TEXT,
story_id TEXT,
from_node TEXT,
to_node TEXT,
visited_at TIMESTAMP,
playthrough INTEGER,
discovery_type TEXT
)
""",
# 节点发现物表
"""
CREATE TABLE IF NOT EXISTS node_discoveries (
id SERIAL PRIMARY KEY,
user_id TEXT,
story_id TEXT,
node_id TEXT,
playthrough INTEGER,
discovery_type TEXT,
discovery_id TEXT,
discovery_name TEXT,
description TEXT,
icon TEXT,
discovered_at TIMESTAMP
)
""",
# 天赋表
"""
CREATE TABLE IF NOT EXISTS talents (
talent_id TEXT PRIMARY KEY,
story_id TEXT,
name TEXT,
description TEXT,
effects JSONB,
rarity TEXT,
unlock_condition TEXT
)
""",
# 出身表
"""
CREATE TABLE IF NOT EXISTS backgrounds (
background_id TEXT PRIMARY KEY,
story_id TEXT,
name TEXT,
description TEXT,
initial_items JSONB,
attribute_bonus JSONB,
starting_skills JSONB,
unlock_condition TEXT
)
""",
# 角色物品表
"""
CREATE TABLE IF NOT EXISTS character_inventory (
inventory_id TEXT PRIMARY KEY,
character_id TEXT,
story_id TEXT,
item_name TEXT,
quantity INTEGER,
source_event_id TEXT,
reason TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
""",
# 会话物品表
"""
CREATE TABLE IF NOT EXISTS session_inventory (
session_inventory_id TEXT PRIMARY KEY,
session_id TEXT,
item_id TEXT,
quantity INTEGER,
meta_json JSONB
)
""",
# 故事物品表
"""
CREATE TABLE IF NOT EXISTS story_items (
item_id TEXT PRIMARY KEY,
story_id TEXT,
name TEXT,
category TEXT,
description TEXT,
effects JSONB,
source_type TEXT,
is_allowed INTEGER DEFAULT 1
)
""",
# 商人物品表
"""
CREATE TABLE IF NOT EXISTS vendor_items (
vendor_item_id TEXT PRIMARY KEY,
vendor_id TEXT,
item_id TEXT,
price INTEGER,
stock INTEGER
)
""",
# 角色武器表
"""
CREATE TABLE IF NOT EXISTS character_weapons (
character_weapon_id TEXT PRIMARY KEY,
character_id TEXT,
story_id TEXT,
weapon_id TEXT,
weapon_name TEXT,
acquired_at TIMESTAMP,
is_equipped INTEGER DEFAULT 0
)
""",
# 故事战斗会话表
"""
CREATE TABLE IF NOT EXISTS story_combat_sessions (
story_session_id TEXT PRIMARY KEY,
character_session_id TEXT,
character_id TEXT,
user_id TEXT,
story_id TEXT,
enemy_name TEXT,
combat_difficulty TEXT,
enemy_hp INTEGER,
enemy_hp_max INTEGER,
enemy_attack INTEGER,
combat_triggered INTEGER DEFAULT 0,
combat_outcome TEXT,
encounter_scene TEXT,
metadata_json JSONB,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
""",
# 事件日志表
"""
CREATE TABLE IF NOT EXISTS event_log (
event_id TEXT PRIMARY KEY,
character_id TEXT,
user_id TEXT,
deltas_json JSONB,
reason TEXT,
source TEXT,
created_at TIMESTAMP
)
""",
# NGP记录表
"""
CREATE TABLE IF NOT EXISTS ngp_records (
ngp_id TEXT PRIMARY KEY,
character_id TEXT,
previous_ngp_level INTEGER,
inherited_talents JSONB,
inherited_bonuses JSONB,
created_at TIMESTAMP
)
""",
# 记忆文档表(RAG)
"""
CREATE TABLE IF NOT EXISTS memory_documents (
doc_id TEXT PRIMARY KEY,
collection TEXT,
text TEXT,
metadata_json TEXT,
created_at TEXT
)
""",
# 会话摘要表
"""
CREATE TABLE IF NOT EXISTS session_summaries (
id SERIAL PRIMARY KEY,
session_id TEXT,
turn INTEGER,
summary TEXT,
context_json TEXT,
created_at TEXT
)
""",
# 故事敌人表
"""
CREATE TABLE IF NOT EXISTS story_enemies (
enemy_id TEXT PRIMARY KEY,
story_id TEXT,
name TEXT,
category TEXT,
tier TEXT,
base_hp INTEGER,
base_attack INTEGER,
default_difficulty TEXT,
aliases_json JSONB,
nodes_json JSONB,
min_turn INTEGER,
encounter_text TEXT,
loot_json JSONB,
attr_rewards_json JSONB,
money_min INTEGER,
money_max INTEGER,
is_allowed INTEGER DEFAULT 1
)
""",
]
# 创建索引
index_queries = [
"CREATE INDEX IF NOT EXISTS idx_saves_user_story ON game_saves(user_id, story_id, playthrough)",
"CREATE INDEX IF NOT EXISTS idx_unlocked_nodes ON user_unlocked_nodes(user_id, story_id, playthrough)",
"CREATE INDEX IF NOT EXISTS idx_exploration_path_user ON exploration_path(user_id, story_id, playthrough)",
"CREATE INDEX IF NOT EXISTS idx_node_discoveries_user ON node_discoveries(user_id, story_id, node_id, playthrough)",
"CREATE INDEX IF NOT EXISTS idx_character_inventory_char ON character_inventory(character_id)",
"CREATE INDEX IF NOT EXISTS idx_session_inventory_session ON session_inventory(session_id)",
"CREATE INDEX IF NOT EXISTS idx_character_weapons_char ON character_weapons(character_id, story_id)",
"CREATE INDEX IF NOT EXISTS idx_event_log_user ON event_log(user_id, created_at)",
"CREATE INDEX IF NOT EXISTS idx_memory_documents_collection ON memory_documents(collection)",
"CREATE INDEX IF NOT EXISTS idx_session_summaries_session ON session_summaries(session_id)",
"CREATE INDEX IF NOT EXISTS idx_story_combat_sessions ON story_combat_sessions(story_session_id)",
]
async with self.pool.acquire() as conn:
for query in queries:
try:
await conn.execute(query)
except Exception as e:
print(f"[DB] 创建表失败: {e}")
for idx_query in index_queries:
try:
await conn.execute(idx_query)
except Exception as e:
print(f"[DB] 创建索引失败: {e}")
print("[DB] 所有表创建/检查完成")
async def execute(self, query: str, *args):
"""执行单条语句"""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""获取多条记录"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetch_one(self, query: str, *args):
"""获取单条记录"""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
async def fetch_val(self, query: str, *args):
"""获取单个值"""
async with self.pool.acquire() as conn:
return await conn.fetchval(query, *args)
async def close(self):
if self.pool:
await self.pool.close()
数据库维护模块 `db_maintenance.py` 从同步改为异步版本。所有数据库操作函数都添加了 `async/await` 关键字,使用 `async with conn.acquire()` 获取连接,确保连接正确释放。
五、兼容性处理
为了确保迁移平稳过渡,我们采取了双数据库兼容策略。所有数据库操作函数都通过 `USE_POSTGRES` 标志判断当前模式,在 PostgreSQL 模式下使用 asyncpg 的异步 API,在 SQLite 模式下回退到原有的 sqlite3 同步 API。这样开发环境可以继续使用 SQLite 快速迭代,生产环境使用 PostgreSQL 保障性能。
六、迁移过程
迁移分为三步:首先在 PostgreSQL 中创建数据库和用户,然后运行初始化脚本创建所有表结构,最后通过 `run_database_maintenance()` 函数将现有 SQLite 数据导出并导入到 PostgreSQL。采用了逐表导出导入方式,没有使用复杂的迁移工具。
async def run_database_maintenance():
"""运行数据库维护(PostgreSQL 版本)"""
if not USE_POSTGRES:
return
print("[DB] 开始 PostgreSQL 数据库维护...")
try:
chars = await migrate_character_inventory_names()
if chars:
print(f"[db] 已规范化 {chars} 个角色的 character_inventory")
catalog = await migrate_story_items_quantity_suffix()
if catalog:
print(f"[db] 已修正 {catalog} 条 story_items 目录命名")
await cleanup_story_data()
print("[DB] PostgreSQL 数据库维护完成")
except Exception as e:
print(f"[DB] 数据库维护失败: {e}")
七、效果与总结
迁移完成后,数据库锁定错误彻底消失,并发写入能力大幅提升。asyncpg 连接池的复用机制减少了连接建立开销,JSONB 类型对游戏属性等动态数据的存取更加高效。这次迁移验证了一个重要经验:数据库选型需要与业务并发模型匹配,SQLite 适合单机低并发场景,而需要处理并发写入的网络服务则应该从一开始就考虑使用 PostgreSQL 这样的专业数据库。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)