HoRain云--Lua数据库操作全攻略

🎬 HoRain云小助手:个人主页
🔥 个人专栏: 《Linux 系列教程》《c语言教程》
⛺️生活的理想,就是为了理想的生活!
⛳️ 推荐
前些天发现了一个超棒的服务器购买网站,性价比超高,大内存超划算!忍不住分享一下给大家。点击跳转到网站。
专栏介绍
|
专栏名称 |
专栏介绍 |
|
本专栏主要撰写C干货内容和编程技巧,让大家从底层了解C,把更多的知识由抽象到简单通俗易懂。 |
|
|
本专栏主要是注重从底层来给大家一步步剖析网络协议的奥秘,一起解密网络协议在运行中协议的基本运行机制! |
|
|
全面深入解析 docker 容器,从基础到进阶,涵盖原理、操作、实践案例,助您精通 docker。 |
|
|
本专栏主要撰写Linux干货内容,从基础到进阶,知识由抽象到简单通俗易懂,帮你从新手小白到扫地僧。 |
|
|
本专栏着重撰写Python相关的干货内容与编程技巧,助力大家从底层去认识Python,将更多复杂的知识由抽象转化为简单易懂的内容。 |
|
|
本专栏主要是发布一些考试和练习题库(涵盖软考、HCIE、HRCE、CCNA等) |
目录

📊 Lua 数据库访问完全指南
Lua 本身没有内置的数据库访问功能,但通过第三方库可以轻松连接各种数据库。LuaSQL 是最常用的数据库访问库,支持 MySQL、SQLite、PostgreSQL、Oracle 等多种数据库。
📦 1. LuaSQL 安装
1.1 安装 LuaRocks(包管理器)
# Linux/macOS
curl -R -O https://luarocks.org/releases/luarocks-3.9.2.tar.gz
tar -zxvf luarocks-3.9.2.tar.gz
cd luarocks-3.9.2
./configure && make && sudo make install
# Windows
# 下载:https://luarocks.org/#quick-start
# 或使用 LuaDist:https://luadist.org/
1.2 安装数据库驱动
# MySQL
luarocks install luasql-mysql
# SQLite
luarocks install luasql-sqlite3
# PostgreSQL
luarocks install luasql-postgres
# Oracle
luarocks install luasql-oci8
# ODBC
luarocks install luasql-odbc
# SQL Server(通过 ODBC)
luarocks install luasql-odbc
🔌 2. MySQL 数据库访问
2.1 基本连接和查询
-- 引入 LuaSQL MySQL 模块
local luasql = require "luasql.mysql"
-- 创建环境对象
local env = assert(luasql.mysql())
-- 连接数据库
local conn = assert(env:connect(
"database_name", -- 数据库名
"username", -- 用户名
"password", -- 密码
"localhost", -- 主机
3306 -- 端口
))
print("✅ MySQL 连接成功!")
-- 创建表
local create_table_sql = [[
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
]]
local success, err = conn:execute(create_table_sql)
if not success then
print("❌ 创建表失败:", err)
else
print("✅ 表创建成功")
end
-- 插入数据
local insert_sql = string.format([[
INSERT INTO users (name, email, age)
VALUES ('%s', '%s', %d)
]], "张三", "zhangsan@example.com", 25)
local rows_affected = conn:execute(insert_sql)
print("📝 插入行数:", rows_affected)
-- 批量插入(使用事务)
conn:execute("START TRANSACTION")
local users = {
{"李四", "lisi@example.com", 30},
{"王五", "wangwu@example.com", 28},
{"赵六", "zhaoliu@example.com", 35}
}
for _, user in ipairs(users) do
local sql = string.format(
"INSERT INTO users (name, email, age) VALUES ('%s', '%s', %d)",
user[1], user[2], user[3]
)
conn:execute(sql)
end
conn:execute("COMMIT")
print("✅ 批量插入完成")
-- 查询数据
local select_sql = "SELECT * FROM users ORDER BY id"
local cursor, err = conn:execute(select_sql)
if not cursor then
print("❌ 查询失败:", err)
else
print("\n📋 用户列表:")
print("ID\t姓名\t邮箱\t\t\t年龄\t创建时间")
print(string.rep("-", 70))
local row = cursor:fetch({}, "a") -- 以关联数组形式获取
while row do
print(string.format("%d\t%s\t%s\t%d\t%s",
row.id, row.name, row.email, row.age, row.created_at))
row = cursor:fetch(row, "a")
end
cursor:close()
end
-- 带参数的查询(防止 SQL 注入)
local function safe_query(conn, name_pattern)
local sql = string.format([[
SELECT * FROM users
WHERE name LIKE '%%%s%%'
ORDER BY age DESC
]], conn:escape(name_pattern))
local cursor = conn:execute(sql)
return cursor
end
-- 使用安全查询
local cursor = safe_query(conn, "张")
if cursor then
print("\n🔍 查询结果(姓张的用户):")
local row = cursor:fetch({}, "a")
while row do
print(string.format("ID:%d 姓名:%s 年龄:%d",
row.id, row.name, row.age))
row = cursor:fetch(row, "a")
end
cursor:close()
end
-- 更新数据
local update_sql = string.format([[
UPDATE users
SET age = age + 1
WHERE name = '%s'
]], "张三")
local affected = conn:execute(update_sql)
print("\n🔄 更新行数:", affected)
-- 删除数据
local delete_sql = "DELETE FROM users WHERE age > 40"
local deleted = conn:execute(delete_sql)
print("🗑️ 删除行数:", deleted)
-- 关闭连接
conn:close()
env:close()
print("🔌 数据库连接已关闭")
2.2 高级功能
-- 连接池管理
local connection_pool = {}
local function get_connection()
for i, conn in ipairs(connection_pool) do
if conn:ping() then
table.remove(connection_pool, i)
return conn
end
end
-- 创建新连接
local env = luasql.mysql()
local conn = env:connect("db_name", "user", "pass", "host", 3306)
conn:set_timeout(5000) -- 设置超时5秒
return conn
end
local function release_connection(conn)
if #connection_pool < 10 then -- 限制连接池大小
table.insert(connection_pool, conn)
else
conn:close()
end
end
-- 事务处理
local function transaction(conn, operations)
local success, err = conn:execute("START TRANSACTION")
if not success then return false, "开始事务失败: " .. err end
for i, op in ipairs(operations) do
local ok, result = pcall(op.func, conn, unpack(op.args or {}))
if not ok then
conn:execute("ROLLBACK")
return false, "操作 " .. i .. " 失败: " .. result
end
end
local ok, err = conn:execute("COMMIT")
if not ok then
conn:execute("ROLLBACK")
return false, "提交事务失败: " .. err
end
return true
end
-- 使用事务
local ops = {
{func = function(conn)
return conn:execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
end},
{func = function(conn)
return conn:execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
end}
}
local success, err = transaction(conn, ops)
if success then
print("✅ 转账成功")
else
print("❌ 转账失败:", err)
end
💾 3. SQLite 数据库访问
3.1 基本操作
local luasql = require "luasql.sqlite3"
-- 创建环境对象
local env = assert(luasql.sqlite3())
-- 连接数据库(文件或内存)
local conn = assert(env:connect("test.db")) -- 文件数据库
-- local conn = assert(env:connect(":memory:")) -- 内存数据库
print("✅ SQLite 连接成功!")
-- 启用外键约束
conn:execute("PRAGMA foreign_keys = ON")
-- 创建表
conn:execute[[
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL CHECK(price > 0),
stock INTEGER DEFAULT 0,
category TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
]]
-- 创建索引
conn:execute("CREATE INDEX IF NOT EXISTS idx_category ON products(category)")
conn:execute("CREATE INDEX IF NOT EXISTS idx_price ON products(price)")
-- 插入数据
local products = {
{"笔记本电脑", 5999.99, 50, "电子产品"},
{"智能手机", 2999.50, 100, "电子产品"},
{"书籍", 49.99, 200, "图书"},
{"咖啡", 25.00, 300, "食品"}
}
conn:execute("BEGIN TRANSACTION")
for _, product in ipairs(products) do
local sql = string.format([[
INSERT INTO products (name, price, stock, category)
VALUES ('%s', %f, %d, '%s')
]], product[1], product[2], product[3], product[4])
conn:execute(sql)
end
conn:execute("COMMIT")
-- 查询数据
local cursor = conn:execute([[
SELECT
id,
name,
printf('%.2f', price) as price_fmt,
stock,
category,
datetime(created_at, 'localtime') as created
FROM products
WHERE stock > 0
ORDER BY price DESC
]])
print("\n🛒 产品列表:")
print("ID\t产品名称\t\t价格\t库存\t分类\t创建时间")
print(string.rep("-", 80))
local row = cursor:fetch({}, "a")
while row do
print(string.format("%d\t%-12s\t¥%s\t%d\t%s\t%s",
row.id, row.name, row.price_fmt, row.stock, row.category, row.created))
row = cursor:fetch(row, "a")
end
cursor:close()
-- 聚合查询
local cursor = conn:execute([[
SELECT
category,
COUNT(*) as count,
SUM(price * stock) as total_value,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price
FROM products
GROUP BY category
HAVING count > 0
ORDER BY total_value DESC
]])
print("\n📊 分类统计:")
print("分类\t\t商品数\t总价值\t平均价格\t最低价\t最高价")
print(string.rep("-", 70))
row = cursor:fetch({}, "a")
while row do
print(string.format("%-8s\t%d\t¥%.2f\t¥%.2f\t\t¥%.2f\t¥%.2f",
row.category, row.count, row.total_value,
row.avg_price, row.min_price, row.max_price))
row = cursor:fetch(row, "a")
end
cursor:close()
-- 更新库存
conn:execute("UPDATE products SET stock = stock - 1 WHERE name = '笔记本电脑'")
-- 删除数据
conn:execute("DELETE FROM products WHERE stock <= 0")
-- 备份数据库
local backup_conn = assert(env:connect("backup.db"))
local cursor = conn:execute("SELECT sql FROM sqlite_master WHERE type='table'")
row = cursor:fetch({}, "a")
while row do
backup_conn:execute(row.sql)
row = cursor:fetch(row, "a")
end
cursor:close()
print("✅ 数据库备份完成")
-- 关闭连接
conn:close()
backup_conn:close()
env:close()
3.2 SQLite 高级功能
-- 自定义函数
local function register_functions(conn)
-- 注册自定义聚合函数
conn:execute([[
CREATE AGGREGATE IF NOT EXISTS product
BEGIN
CREATE TEMP TABLE IF NOT EXISTS temp_product (value REAL);
INSERT INTO temp_product VALUES (?1);
END;
]])
-- 注册标量函数
conn:execute([[
CREATE FUNCTION IF NOT EXISTS format_price(price REAL)
RETURNS TEXT
BEGIN
RETURN '¥' || printf('%.2f', price);
END;
]])
end
-- 使用视图
conn:execute([[
CREATE VIEW IF NOT EXISTS product_summary AS
SELECT
category,
COUNT(*) as product_count,
SUM(stock) as total_stock,
AVG(price) as average_price
FROM products
GROUP BY category
]])
-- 使用触发器
conn:execute([[
CREATE TRIGGER IF NOT EXISTS update_timestamp
AFTER UPDATE ON products
BEGIN
UPDATE products
SET updated_at = CURRENT_TIMESTAMP
WHERE id = NEW.id;
END;
]])
-- 事务保存点
conn:execute("SAVEPOINT sp1")
-- 执行一些操作
conn:execute("RELEASE SAVEPOINT sp1") -- 提交保存点
-- 或
conn:execute("ROLLBACK TO SAVEPOINT sp1") -- 回滚到保存点
🐘 4. PostgreSQL 数据库访问
4.1 基本操作
local luasql = require "luasql.postgres"
-- 创建环境对象
local env = assert(luasql.postgres())
-- 连接数据库
local conn = assert(env:connect(
"database_name", -- 数据库名
"username", -- 用户名
"password", -- 密码
"localhost", -- 主机
5432, -- 端口
nil, -- Unix socket
nil, -- 连接超时
"client_encoding=UTF8" -- 连接选项
))
print("✅ PostgreSQL 连接成功!")
-- 创建表(使用 PostgreSQL 特定类型)
conn:execute([[
CREATE TABLE IF NOT EXISTS employees (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
salary DECIMAL(10, 2) CHECK(salary > 0),
department VARCHAR(50),
hire_date DATE DEFAULT CURRENT_DATE,
is_active BOOLEAN DEFAULT TRUE,
skills TEXT[],
metadata JSONB
)
]])
-- 创建索引
conn:execute("CREATE INDEX IF NOT EXISTS idx_department ON employees(department)")
conn:execute("CREATE INDEX IF NOT EXISTS idx_salary ON employees(salary DESC)")
-- 插入数据(使用参数化查询)
local insert_sql = [[
INSERT INTO employees
(name, email, salary, department, skills, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
]]
-- 注意:LuaSQL 原生不支持参数化查询,需要手动转义
local employees = {
{"张三", "zhangsan@example.com", 15000.00, "技术部",
'{"Lua", "PostgreSQL", "Linux"}', '{"level": "高级", "projects": 5}'},
{"李四", "lisi@example.com", 12000.50, "市场部",
'{"营销", "沟通"}', '{"level": "中级", "clients": 20}'},
{"王五", "wangwu@example.com", 18000.75, "技术部",
'{"Python", "Docker", "Kubernetes"}', '{"level": "资深", "certifications": 3}'}
}
conn:execute("BEGIN")
for _, emp in ipairs(employees) do
local sql = string.format([[
INSERT INTO employees
(name, email, salary, department, skills, metadata)
VALUES ('%s', '%s', %f, '%s', '%s', '%s')
]],
conn:escape(emp[1]), conn:escape(emp[2]), emp[3],
conn:escape(emp[4]), conn:escape(emp[5]), conn:escape(emp[6]))
conn:execute(sql)
end
conn:execute("COMMIT")
-- 查询数据
local cursor = conn:execute([[
SELECT
id,
name,
email,
TO_CHAR(salary, 'FM999,999,999.00') as formatted_salary,
department,
TO_CHAR(hire_date, 'YYYY-MM-DD') as hire_date_fmt,
is_active,
skills,
metadata
FROM employees
WHERE is_active = TRUE
ORDER BY salary DESC
]])
print("\n👥 员工列表:")
print("ID\t姓名\t邮箱\t\t\t薪资\t\t部门\t入职日期\t状态\t技能")
print(string.rep("-", 120))
local row = cursor:fetch({}, "a")
while row do
local skills = row.skills and row.skails:gsub("[{}]", "") or ""
print(string.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s",
row.id, row.name, row.email, row.formatted_salary,
row.department, row.hire_date_fmt,
row.is_active and "在职" or "离职",
skills))
row = cursor:fetch(row, "a")
end
cursor:close()
-- JSON 查询
local cursor = conn:execute([[
SELECT
name,
metadata->>'level' as level,
(metadata->>'projects')::int as projects
FROM employees
WHERE metadata->>'level' = '高级'
OR metadata->>'level' = '资深'
]])
print("\n🏆 高级员工:")
row = cursor:fetch({}, "a")
while row do
print(string.format("%s - 级别:%s 项目数:%d",
row.name, row.level, row.projects or 0))
row = cursor:fetch(row, "a")
end
cursor:close()
-- 数组查询
local cursor = conn:execute([[
SELECT name, skills
FROM employees
WHERE 'Lua' = ANY(skills)
OR 'Python' = ANY(skills)
]])
print("\n💻 技术员工:")
row = cursor:fetch({}, "a")
while row do
local skills = row.skills:gsub("[{}]", ""):gsub('"', '')
print(string.format("%s - 技能: %s", row.name, skills))
row = cursor:fetch(row, "a")
end
cursor:close()
-- 关闭连接
conn:close()
env:close()
🏢 5. Oracle 数据库访问
5.1 安装和配置
# 安装 Oracle Instant Client
# 下载:https://www.oracle.com/database/technologies/instant-client.html
# Linux 安装示例
sudo apt-get install libaio1 # 依赖
sudo rpm -Uvh oracle-instantclient19.8-basic-19.8.0.0.0-1.x86_64.rpm
sudo rpm -Uvh oracle-instantclient19.8-devel-19.8.0.0.0-1.x86_64.rpm
# 设置环境变量
export ORACLE_HOME=/usr/lib/oracle/19.8/client64
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH
export PATH=$ORACLE_HOME/bin:$PATH
# 安装 LuaSQL Oracle 驱动
luarocks install luasql-oci8 OCI8_INCDIR=/usr/include/oracle/19.8/client64/
5.2 连接和操作
local luasql = require "luasql.oci8"
-- 创建环境对象
local env = assert(luasql.oci8())
-- 连接数据库
local conn = assert(env:connect(
"//hostname:port/service_name", -- 连接字符串
"username", -- 用户名
"password" -- 密码
))
print("✅ Oracle 连接成功!")
-- 创建表
conn:execute([[
CREATE TABLE customers (
customer_id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
customer_name VARCHAR2(100) NOT NULL,
email VARCHAR2(100) UNIQUE,
phone VARCHAR2(20),
registration_date DATE DEFAULT SYSDATE,
credit_limit NUMBER(10, 2) DEFAULT 10000.00,
status VARCHAR2(10) DEFAULT 'ACTIVE'
)
]])
-- 创建序列(如果需要)
conn:execute("CREATE SEQUENCE customer_seq START WITH 1 INCREMENT BY 1")
-- 插入数据
local customers = {
{"张三", "zhangsan@example.com", "13800138000", 50000.00},
{"李四", "lisi@example.com", "13900139000", 30000.00},
{"王五", "wangwu@example.com", "13700137000", 80000.00}
}
conn:execute("BEGIN")
for _, cust in ipairs(customers) do
local sql = string.format([[
INSERT INTO customers
(customer_id, customer_name, email, phone, credit_limit)
VALUES (customer_seq.NEXTVAL, '%s', '%s', '%s', %f)
]], conn:escape(cust[1]), conn:escape(cust[2]),
conn:escape(cust[3]), cust[4])
conn:execute(sql)
end
conn:execute("COMMIT")
-- 查询数据
local cursor = conn:execute([[
SELECT
customer_id,
customer_name,
email,
phone,
TO_CHAR(registration_date, 'YYYY-MM-DD HH24:MI:SS') as reg_date,
credit_limit,
status
FROM customers
WHERE status = 'ACTIVE'
ORDER BY credit_limit DESC
]])
print("\n👥 客户列表:")
print("ID\t姓名\t邮箱\t\t\t电话\t\t注册时间\t\t信用额度\t状态")
print(string.rep("-", 130))
local row = cursor:fetch({}, "a")
while row do
print(string.format("%d\t%s\t%s\t%s\t%s\t%.2f\t%s",
row.customer_id, row.customer_name, row.email,
row.phone, row.reg_date, row.credit_limit, row.status))
row = cursor:fetch(row, "a")
end
cursor:close()
-- 存储过程调用
conn:execute([[
CREATE OR REPLACE PROCEDURE update_credit_limit(
p_customer_id IN NUMBER,
p_new_limit IN NUMBER,
p_result OUT VARCHAR2
) AS
BEGIN
UPDATE customers
SET credit_limit = p_new_limit
WHERE customer_id = p_customer_id;
IF SQL%ROWCOUNT = 0 THEN
p_result := '客户不存在';
ELSE
p_result := '更新成功';
END IF;
COMMIT;
END;
]])
-- 调用存储过程(LuaSQL 直接调用存储过程有限制,通常使用匿名块)
local update_sql = string.format([[
DECLARE
v_result VARCHAR2(100);
BEGIN
update_credit_limit(%d, %f, v_result);
DBMS_OUTPUT.PUT_LINE(v_result);
END;
]], 1, 60000.00)
conn:execute(update_sql)
-- 关闭连接
conn:close()
env:close()
🔄 6. 数据库连接池
6.1 简单连接池实现
local luasql = require "luasql.mysql"
local ConnectionPool = {}
ConnectionPool.__index = ConnectionPool
function ConnectionPool.new(config)
local self = setmetatable({}, ConnectionPool)
self.config = config
self.pool = {}
self.active_connections = 0
self.max_connections = config.max_connections or 10
self.timeout = config.timeout or 5000 -- 5秒超时
self.env = luasql.mysql()
return self
end
function ConnectionPool:get_connection()
-- 从池中获取可用连接
for i, conn in ipairs(self.pool) do
if self:_is_connection_valid(conn) then
table.remove(self.pool, i)
self.active_connections = self.active_connections + 1
return conn
end
end
-- 创建新连接
if self.active_connections < self.max_connections then
local conn = self.env:connect(
self.config.database,
self.config.username,
self.config.password,
self.config.host,
self.config.port
)
if conn then
self.active_connections = self.active_connections + 1
return conn
end
end
return nil, "无法获取数据库连接"
end
function ConnectionPool:release_connection(conn)
if self:_is_connection_valid(conn) then
table.insert(self.pool, conn)
else
conn:close()
end
self.active_connections = self.active_connections - 1
end
function ConnectionPool:_is_connection_valid(conn)
-- 简单的心跳检查
local ok, err = pcall(function()
local cursor = conn:execute("SELECT 1")
if cursor then cursor:close() end
return true
end)
return ok
end
function ConnectionPool:close_all()
for _, conn in ipairs(self.pool) do
conn:close()
end
self.pool = {}
self.active_connections = 0
self.env:close()
end
-- 使用连接池
local pool_config = {
host = "localhost",
port = 3306,
database = "test_db",
username = "root",
password = "password",
max_connections = 5
}
local pool = ConnectionPool.new(pool_config)
-- 获取连接执行查询
local conn, err = pool:get_connection()
if conn then
local cursor = conn:execute("SELECT * FROM users LIMIT 10")
-- 处理结果...
cursor:close()
pool:release_connection(conn)
else
print("获取连接失败:", err)
end
-- 使用完成后关闭所有连接
pool:close_all()
🛡️ 7. 安全最佳实践
7.1 防止 SQL 注入
local function safe_execute(conn, sql, ...)
local args = {...}
-- 转义所有参数
for i, arg in ipairs(args) do
if type(arg) == "string" then
args[i] = conn:escape(arg)
end
end
-- 构建安全 SQL
local safe_sql = string.format(sql, unpack(args))
return conn:execute(safe_sql)
end
-- 使用示例
local user_input = "张三'; DROP TABLE users; --"
local sql = "SELECT * FROM users WHERE name = '%s'"
-- 安全执行
local cursor = safe_execute(conn, sql, user_input)
-- 参数化查询(如果数据库驱动支持)
local function parameterized_query(conn, sql, params)
-- 这里需要根据具体数据库驱动实现
-- 例如使用预处理语句
return conn:prepare(sql):execute(params)
end
7.2 连接安全
-- 使用 SSL 连接(如果数据库支持)
local ssl_config = {
ssl = true,
ssl_ca = "/path/to/ca-cert.pem",
ssl_cert = "/path/to/client-cert.pem",
ssl_key = "/path/to/client-key.pem"
}
-- 加密密码存储
local function encrypt_password(password, salt)
-- 使用 bcrypt 或类似算法
-- 这里简化示例
return password -- 实际应用中应该使用加密
end
-- 连接超时和重试
local function connect_with_retry(env, config, max_retries)
local retries = 0
while retries < max_retries do
local conn, err = env:connect(
config.database,
config.username,
config.password,
config.host,
config.port
)
if conn then
return conn
end
retries = retries + 1
print(string.format("连接失败,第 %d 次重试...", retries))
os.execute("sleep " .. (2 ^ retries)) -- 指数退避
end
return nil, "连接失败,达到最大重试次数"
end
📈 8. 性能优化
8.1 批量操作
-- 批量插入优化
local function batch_insert(conn, table_name, data, batch_size)
batch_size = batch_size or 1000
conn:execute("START TRANSACTION")
local values = {}
for i, row in ipairs(data) do
local escaped_values = {}
for _, value in ipairs(row) do
if type(value) == "string" then
table.insert(escaped_values, "'" .. conn:escape(value) .. "'")
else
table.insert(escaped_values, tostring(value))
end
end
table.insert(values, "(" .. table.concat(escaped_values, ",") .. ")")
-- 分批提交
if i % batch_size == 0 then
local sql = string.format(
"INSERT INTO %s VALUES %s",
table_name,
table.concat(values, ",")
)
conn:execute(sql)
values = {}
end
end
-- 插入剩余数据
if #values > 0 then
local sql = string.format(
"INSERT INTO %s VALUES %s",
table_name,
table.concat(values, ",")
)
conn:execute(sql)
end
conn:execute("COMMIT")
end
-- 使用批量插入
local large_data = {}
for i = 1, 10000 do
table.insert(large_data, {
"用户" .. i,
"user" .. i .. "@example.com",
math.random(20, 60),
os.time() + i
})
end
batch_insert(conn, "users", large_data, 1000)
8.2 查询优化
-- 使用索引提示
local function query_with_index(conn, table_name, index_name, conditions)
local sql = string.format([[
SELECT /*+ INDEX(%s %s) */ *
FROM %s
WHERE %s
]], table_name, index_name, table_name, conditions)
return conn:execute(sql)
end
-- 分页查询
local function paginated_query(conn, sql, page, page_size)
local offset = (page - 1) * page_size
local paginated_sql = string.format("%s LIMIT %d OFFSET %d",
sql, page_size, offset)
return conn:execute(paginated_sql)
end
-- 查询缓存
local query_cache = {}
local function cached_query(conn, sql, ttl)
ttl = ttl or 300 -- 默认5分钟
local cache_key = sql
local cached = query_cache[cache_key]
if cached and os.time() - cached.timestamp < ttl then
return cached.result
end
local cursor = conn:execute(sql)
local result = {}
local row = cursor:fetch({}, "a")
while row do
table.insert(result, row)
row = cursor:fetch(row, "a")
end
cursor:close()
query_cache[cache_key] = {
result = result,
timestamp = os.time()
}
return result
end
🧪 9. 测试和调试
9.1 单元测试
local busted = require "busted"
describe("数据库测试", function()
local conn
setup(function()
-- 测试前建立连接
local env = luasql.mysql()
conn = env:connect("test_db", "test_user", "test_pass", "localhost", 3306)
conn:execute("CREATE TABLE IF NOT EXISTS test_users (id INT, name VARCHAR(50))")
end)
teardown(function()
-- 测试后清理
if conn then
conn:execute("DROP TABLE IF EXISTS test_users")
conn:close()
end
end)
it("应该能插入数据", function()
local sql = "INSERT INTO test_users VALUES (1, '测试用户')"
local result = conn:execute(sql)
assert.is_true(result ~= nil)
end)
it("应该能查询数据", function()
local cursor = conn:execute("SELECT * FROM test_users WHERE id = 1")
assert.is_not_nil(cursor)
local row = cursor:fetch({}, "a")
assert.are.equal("测试用户", row.name)
cursor:close()
end)
it("应该能更新数据", function()
local sql = "UPDATE test_users SET name = '更新用户' WHERE id = 1"
local affected = conn:execute(sql)
assert.are.equal(1, affected)
end)
it("应该能删除数据", function()
local sql = "DELETE FROM test_users WHERE id = 1"
local affected = conn:execute(sql)
assert.are.equal(1, affected)
end)
end)
9.2 性能测试
local function benchmark_query(conn, sql, iterations)
local total_time = 0
for i = 1, iterations do
local start_time = os.clock()
local cursor = conn:execute(sql)
if cursor then
cursor:close()
end
local end_time = os.clock()
total_time = total_time + (end_time - start_time)
end
local avg_time = total_time / iterations
local qps = 1 / avg_time
print(string.format("查询: %s", sql))
print(string.format("迭代次数: %d", iterations))
print(string.format("总时间: %.4f 秒", total_time))
print(string.format("平均时间: %.4f 秒", avg_time))
print(string.format("QPS: %.2f", qps))
return avg_time, qps
end
-- 运行性能测试
benchmark_query(conn, "SELECT * FROM users WHERE age > 20", 1000)
benchmark_query(conn, "SELECT * FROM users WHERE name LIKE '%张%'", 100)
🔧 10. 错误处理
10.1 统一错误处理
local Database = {}
Database.__index = Database
function Database.new(config)
local self = setmetatable({}, Database)
self.config = config
self.env = nil
self.conn = nil
return self
end
function Database:connect()
local ok, env_or_err = pcall(function()
if self.config.driver == "mysql" then
return require("luasql.mysql")
elseif self.config.driver == "sqlite3" then
return require("luasql.sqlite3")
elseif self.config.driver == "postgres" then
return require("luasql.postgres")
else
error("不支持的数据库驱动: " .. self.config.driver)
end
end)
if not ok then
return false, "加载数据库驱动失败: " .. env_or_err
end
self.env = env_or_err[self.config.driver]()
if not self.env then
return false, "创建数据库环境失败"
end
local conn, err = self.env:connect(
self.config.database,
self.config.username,
self.config.password,
self.config.host,
self.config.port
)
if not conn then
return false, "连接数据库失败: " .. (err or "未知错误")
end
self.conn = conn
return true
end
function Database:query(sql, params)
if not self.conn then
return false, "数据库未连接"
end
local ok, cursor_or_err = pcall(function()
-- 参数处理
if params then
for i, param in ipairs(params) do
if type(param) == "string" then
params[i] = self.conn:escape(param)
end
end
sql = string.format(sql, unpack(params))
end
return self.conn:execute(sql)
end)
if not ok then
return false, "执行查询失败: " .. cursor_or_err
end
if not cursor_or_err then
return false, "查询返回空结果"
end
-- 将游标转换为结果集
local results = {}
local row = cursor_or_err:fetch({}, "a")
while row do
table.insert(results, row)
row = cursor_or_err:fetch(row, "a")
end
cursor_or_err:close()
return true, results
end
function Database:execute(sql, params)
if not self.conn then
return false, "数据库未连接"
end
local ok, result_or_err = pcall(function()
if params then
for i, param in ipairs(params) do
if type(param) == "string" then
params[i] = self.conn:escape(param)
end
end
sql = string.format(sql, unpack(params))
end
return self.conn:execute(sql)
end)
if not ok then
return false, "执行失败: " .. result_or_err
end
return true, result_or_err
end
function Database:close()
if self.conn then
self.conn:close()
self.conn = nil
end
if self.env then
self.env:close()
self.env = nil
end
return true
end
-- 使用示例
local db = Database.new({
driver = "mysql",
host = "localhost",
port = 3306,
database = "test_db",
username = "root",
password = "password"
})
local ok, err = db:connect()
if not ok then
print("连接失败:", err)
return
end
-- 执行查询
local ok, results = db:query("SELECT * FROM users WHERE age > ?", {18})
if ok then
for _, user in ipairs(results) do
print(user.name, user.email)
end
else
print("查询失败:", results) -- 这里 results 是错误信息
end
-- 执行更新
local ok, affected = db:execute("UPDATE users SET status = ? WHERE id = ?", {"active", 1})
if ok then
print("更新成功,影响行数:", affected)
else
print("更新失败:", affected)
end
db:close()
📊 11. 监控和日志
11.1 查询日志
local QueryLogger = {}
QueryLogger.__index = QueryLogger
function QueryLogger.new()
local self = setmetatable({}, QueryLogger)
self.queries = {}
self.enabled = true
return self
end
function QueryLogger:log_query(sql, params, execution_time, success, error_msg)
if not self.enabled then return end
local log_entry = {
timestamp = os.date("%Y-%m-%d %H:%M:%S"),
sql = sql,
params = params,
execution_time = execution_time,
success = success,
error = error_msg,
backtrace = debug.traceback()
}
table.insert(self.queries, log_entry)
-- 限制日志大小
if #self.queries > 1000 then
table.remove(self.queries, 1)
end
-- 输出到控制台
local status = success and "✅" or "❌"
print(string.format("[%s] %s %s (%.4fs)",
log_entry.timestamp, status, sql, execution_time))
if not success and error_msg then
print("错误:", error_msg)
end
end
function QueryLogger:get_statistics()
local stats = {
total_queries = #self.queries,
successful_queries = 0,
failed_queries = 0,
total_execution_time = 0,
avg_execution_time = 0,
slow_queries = {}
}
for _, query in ipairs(self.queries) do
if query.success then
stats.successful_queries = stats.successful_queries + 1
else
stats.failed_queries = stats.failed_queries + 1
end
stats.total_execution_time = stats.total_execution_time + query.execution_time
-- 记录慢查询(超过100ms)
if query.execution_time > 0.1 then
table.insert(stats.slow_queries, query)
end
end
if stats.total_queries > 0 then
stats.avg_execution_time = stats.total_execution_time / stats.total_queries
end
return stats
end
function QueryLogger:export_logs(filename)
local file = io.open(filename, "w")
if not file then return false end
file:write("时间戳,SQL,参数,执行时间(秒),成功,错误\n")
for _, query in ipairs(self.queries) do
local params_str = query.params and table.concat(query.params, "|") or ""
local error_str = query.error or ""
file:write(string.format('"%s","%s","%s",%.6f,%s,"%s"\n',
query.timestamp,
query.sql:gsub('"', '""'),
params_str:gsub('"', '""'),
query.execution_time,
query.success and "true" or "false",
error_str:gsub('"', '""')
))
end
file:close()
return true
end
-- 使用日志记录器
local logger = QueryLogger.new()
-- 包装数据库操作
local function logged_query(conn, sql, params)
local start_time = os.clock()
local success, result_or_error = pcall(function()
if params then
for i, param in ipairs(params) do
if type(param) == "string" then
params[i] = conn:escape(param)
end
end
sql = string.format(sql, unpack(params))
end
return conn:execute(sql)
end)
local execution_time = os.clock() - start_time
logger:log_query(sql, params, execution_time, success,
success and nil or result_or_error)
if success then
return result_or_error
else
error(result_or_error)
end
end
-- 定期输出统计信息
local function monitor_performance(logger, interval)
while true do
os.execute("sleep " .. interval)
local stats = logger:get_statistics()
print("\n📊 数据库性能统计:")
print("总查询数:", stats.total_queries)
print("成功查询:", stats.successful_queries)
print("失败查询:", stats.failed_queries)
print("平均执行时间:", string.format("%.4f秒", stats.avg_execution_time))
print("慢查询数:", #stats.slow_queries)
if #stats.slow_queries > 0 then
print("\n🐌 慢查询列表:")
❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄
💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍
🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)