🎬 HoRain云小助手个人主页

 🔥 个人专栏: 《Linux 系列教程》《c语言教程

⛺️生活的理想,就是为了理想的生活!


⛳️ 推荐

前些天发现了一个超棒的服务器购买网站,性价比超高,大内存超划算!忍不住分享一下给大家。点击跳转到网站。

专栏介绍

专栏名称

专栏介绍

《C语言》

本专栏主要撰写C干货内容和编程技巧,让大家从底层了解C,把更多的知识由抽象到简单通俗易懂。

《网络协议》

本专栏主要是注重从底层来给大家一步步剖析网络协议的奥秘,一起解密网络协议在运行中协议的基本运行机制!

《docker容器精解篇》

全面深入解析 docker 容器,从基础到进阶,涵盖原理、操作、实践案例,助您精通 docker。

《linux系列》

本专栏主要撰写Linux干货内容,从基础到进阶,知识由抽象到简单通俗易懂,帮你从新手小白到扫地僧。

《python 系列》

本专栏着重撰写Python相关的干货内容与编程技巧,助力大家从底层去认识Python,将更多复杂的知识由抽象转化为简单易懂的内容。

《试题库》

本专栏主要是发布一些考试和练习题库(涵盖软考、HCIE、HRCE、CCNA等)

目录

⛳️ 推荐

专栏介绍

📊 Lua 数据库访问完全指南

📦 1. LuaSQL 安装

1.1 安装 LuaRocks(包管理器)

1.2 安装数据库驱动

🔌 2. MySQL 数据库访问

2.1 基本连接和查询

2.2 高级功能

💾 3. SQLite 数据库访问

3.1 基本操作

3.2 SQLite 高级功能

🐘 4. PostgreSQL 数据库访问

4.1 基本操作

🏢 5. Oracle 数据库访问

5.1 安装和配置

5.2 连接和操作

🔄 6. 数据库连接池

6.1 简单连接池实现

🛡️ 7. 安全最佳实践

7.1 防止 SQL 注入

7.2 连接安全

📈 8. 性能优化

8.1 批量操作

8.2 查询优化

🧪 9. 测试和调试

9.1 单元测试

9.2 性能测试

🔧 10. 错误处理

10.1 统一错误处理

📊 11. 监控和日志

11.1 查询日志


img

📊 Lua 数据库访问完全指南

Lua 本身没有内置的数据库访问功能,但通过第三方库可以轻松连接各种数据库。LuaSQL 是最常用的数据库访问库,支持 MySQL、SQLite、PostgreSQL、Oracle 等多种数据库。

📦 1. LuaSQL 安装

1.1 安装 LuaRocks(包管理器)

# Linux/macOS
curl -R -O https://luarocks.org/releases/luarocks-3.9.2.tar.gz
tar -zxvf luarocks-3.9.2.tar.gz
cd luarocks-3.9.2
./configure && make && sudo make install

# Windows
# 下载:https://luarocks.org/#quick-start
# 或使用 LuaDist:https://luadist.org/

1.2 安装数据库驱动

# MySQL
luarocks install luasql-mysql

# SQLite
luarocks install luasql-sqlite3

# PostgreSQL
luarocks install luasql-postgres

# Oracle
luarocks install luasql-oci8

# ODBC
luarocks install luasql-odbc

# SQL Server(通过 ODBC)
luarocks install luasql-odbc

🔌 2. MySQL 数据库访问

2.1 基本连接和查询

-- 引入 LuaSQL MySQL 模块
local luasql = require "luasql.mysql"

-- 创建环境对象
local env = assert(luasql.mysql())

-- 连接数据库
local conn = assert(env:connect(
    "database_name",    -- 数据库名
    "username",         -- 用户名
    "password",         -- 密码
    "localhost",        -- 主机
    3306               -- 端口
))

print("✅ MySQL 连接成功!")

-- 创建表
local create_table_sql = [[
    CREATE TABLE IF NOT EXISTS users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(100) UNIQUE NOT NULL,
        age INT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
]]

local success, err = conn:execute(create_table_sql)
if not success then
    print("❌ 创建表失败:", err)
else
    print("✅ 表创建成功")
end

-- 插入数据
local insert_sql = string.format([[
    INSERT INTO users (name, email, age) 
    VALUES ('%s', '%s', %d)
]], "张三", "zhangsan@example.com", 25)

local rows_affected = conn:execute(insert_sql)
print("📝 插入行数:", rows_affected)

-- 批量插入(使用事务)
conn:execute("START TRANSACTION")

local users = {
    {"李四", "lisi@example.com", 30},
    {"王五", "wangwu@example.com", 28},
    {"赵六", "zhaoliu@example.com", 35}
}

for _, user in ipairs(users) do
    local sql = string.format(
        "INSERT INTO users (name, email, age) VALUES ('%s', '%s', %d)",
        user[1], user[2], user[3]
    )
    conn:execute(sql)
end

conn:execute("COMMIT")
print("✅ 批量插入完成")

-- 查询数据
local select_sql = "SELECT * FROM users ORDER BY id"
local cursor, err = conn:execute(select_sql)

if not cursor then
    print("❌ 查询失败:", err)
else
    print("\n📋 用户列表:")
    print("ID\t姓名\t邮箱\t\t\t年龄\t创建时间")
    print(string.rep("-", 70))
    
    local row = cursor:fetch({}, "a")  -- 以关联数组形式获取
    while row do
        print(string.format("%d\t%s\t%s\t%d\t%s", 
            row.id, row.name, row.email, row.age, row.created_at))
        row = cursor:fetch(row, "a")
    end
    
    cursor:close()
end

-- 带参数的查询(防止 SQL 注入)
local function safe_query(conn, name_pattern)
    local sql = string.format([[
        SELECT * FROM users 
        WHERE name LIKE '%%%s%%' 
        ORDER BY age DESC
    ]], conn:escape(name_pattern))
    
    local cursor = conn:execute(sql)
    return cursor
end

-- 使用安全查询
local cursor = safe_query(conn, "张")
if cursor then
    print("\n🔍 查询结果(姓张的用户):")
    local row = cursor:fetch({}, "a")
    while row do
        print(string.format("ID:%d 姓名:%s 年龄:%d", 
            row.id, row.name, row.age))
        row = cursor:fetch(row, "a")
    end
    cursor:close()
end

-- 更新数据
local update_sql = string.format([[
    UPDATE users 
    SET age = age + 1 
    WHERE name = '%s'
]], "张三")

local affected = conn:execute(update_sql)
print("\n🔄 更新行数:", affected)

-- 删除数据
local delete_sql = "DELETE FROM users WHERE age > 40"
local deleted = conn:execute(delete_sql)
print("🗑️  删除行数:", deleted)

-- 关闭连接
conn:close()
env:close()
print("🔌 数据库连接已关闭")

2.2 高级功能

-- 连接池管理
local connection_pool = {}

local function get_connection()
    for i, conn in ipairs(connection_pool) do
        if conn:ping() then
            table.remove(connection_pool, i)
            return conn
        end
    end
    
    -- 创建新连接
    local env = luasql.mysql()
    local conn = env:connect("db_name", "user", "pass", "host", 3306)
    conn:set_timeout(5000)  -- 设置超时5秒
    return conn
end

local function release_connection(conn)
    if #connection_pool < 10 then  -- 限制连接池大小
        table.insert(connection_pool, conn)
    else
        conn:close()
    end
end

-- 事务处理
local function transaction(conn, operations)
    local success, err = conn:execute("START TRANSACTION")
    if not success then return false, "开始事务失败: " .. err end
    
    for i, op in ipairs(operations) do
        local ok, result = pcall(op.func, conn, unpack(op.args or {}))
        if not ok then
            conn:execute("ROLLBACK")
            return false, "操作 " .. i .. " 失败: " .. result
        end
    end
    
    local ok, err = conn:execute("COMMIT")
    if not ok then
        conn:execute("ROLLBACK")
        return false, "提交事务失败: " .. err
    end
    
    return true
end

-- 使用事务
local ops = {
    {func = function(conn)
        return conn:execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    end},
    {func = function(conn)
        return conn:execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    end}
}

local success, err = transaction(conn, ops)
if success then
    print("✅ 转账成功")
else
    print("❌ 转账失败:", err)
end

💾 3. SQLite 数据库访问

3.1 基本操作

local luasql = require "luasql.sqlite3"

-- 创建环境对象
local env = assert(luasql.sqlite3())

-- 连接数据库(文件或内存)
local conn = assert(env:connect("test.db"))  -- 文件数据库
-- local conn = assert(env:connect(":memory:"))  -- 内存数据库

print("✅ SQLite 连接成功!")

-- 启用外键约束
conn:execute("PRAGMA foreign_keys = ON")

-- 创建表
conn:execute[[
    CREATE TABLE IF NOT EXISTS products (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        price REAL CHECK(price > 0),
        stock INTEGER DEFAULT 0,
        category TEXT,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
]]

-- 创建索引
conn:execute("CREATE INDEX IF NOT EXISTS idx_category ON products(category)")
conn:execute("CREATE INDEX IF NOT EXISTS idx_price ON products(price)")

-- 插入数据
local products = {
    {"笔记本电脑", 5999.99, 50, "电子产品"},
    {"智能手机", 2999.50, 100, "电子产品"},
    {"书籍", 49.99, 200, "图书"},
    {"咖啡", 25.00, 300, "食品"}
}

conn:execute("BEGIN TRANSACTION")
for _, product in ipairs(products) do
    local sql = string.format([[
        INSERT INTO products (name, price, stock, category) 
        VALUES ('%s', %f, %d, '%s')
    ]], product[1], product[2], product[3], product[4])
    conn:execute(sql)
end
conn:execute("COMMIT")

-- 查询数据
local cursor = conn:execute([[
    SELECT 
        id, 
        name, 
        printf('%.2f', price) as price_fmt,
        stock,
        category,
        datetime(created_at, 'localtime') as created
    FROM products 
    WHERE stock > 0
    ORDER BY price DESC
]])

print("\n🛒 产品列表:")
print("ID\t产品名称\t\t价格\t库存\t分类\t创建时间")
print(string.rep("-", 80))

local row = cursor:fetch({}, "a")
while row do
    print(string.format("%d\t%-12s\t¥%s\t%d\t%s\t%s",
        row.id, row.name, row.price_fmt, row.stock, row.category, row.created))
    row = cursor:fetch(row, "a")
end
cursor:close()

-- 聚合查询
local cursor = conn:execute([[
    SELECT 
        category,
        COUNT(*) as count,
        SUM(price * stock) as total_value,
        AVG(price) as avg_price,
        MIN(price) as min_price,
        MAX(price) as max_price
    FROM products
    GROUP BY category
    HAVING count > 0
    ORDER BY total_value DESC
]])

print("\n📊 分类统计:")
print("分类\t\t商品数\t总价值\t平均价格\t最低价\t最高价")
print(string.rep("-", 70))

row = cursor:fetch({}, "a")
while row do
    print(string.format("%-8s\t%d\t¥%.2f\t¥%.2f\t\t¥%.2f\t¥%.2f",
        row.category, row.count, row.total_value, 
        row.avg_price, row.min_price, row.max_price))
    row = cursor:fetch(row, "a")
end
cursor:close()

-- 更新库存
conn:execute("UPDATE products SET stock = stock - 1 WHERE name = '笔记本电脑'")

-- 删除数据
conn:execute("DELETE FROM products WHERE stock <= 0")

-- 备份数据库
local backup_conn = assert(env:connect("backup.db"))
local cursor = conn:execute("SELECT sql FROM sqlite_master WHERE type='table'")
row = cursor:fetch({}, "a")
while row do
    backup_conn:execute(row.sql)
    row = cursor:fetch(row, "a")
end
cursor:close()

print("✅ 数据库备份完成")

-- 关闭连接
conn:close()
backup_conn:close()
env:close()

3.2 SQLite 高级功能

-- 自定义函数
local function register_functions(conn)
    -- 注册自定义聚合函数
    conn:execute([[
        CREATE AGGREGATE IF NOT EXISTS product 
        BEGIN
            CREATE TEMP TABLE IF NOT EXISTS temp_product (value REAL);
            INSERT INTO temp_product VALUES (?1);
        END;
    ]])
    
    -- 注册标量函数
    conn:execute([[
        CREATE FUNCTION IF NOT EXISTS format_price(price REAL) 
        RETURNS TEXT 
        BEGIN
            RETURN '¥' || printf('%.2f', price);
        END;
    ]])
end

-- 使用视图
conn:execute([[
    CREATE VIEW IF NOT EXISTS product_summary AS
    SELECT 
        category,
        COUNT(*) as product_count,
        SUM(stock) as total_stock,
        AVG(price) as average_price
    FROM products
    GROUP BY category
]])

-- 使用触发器
conn:execute([[
    CREATE TRIGGER IF NOT EXISTS update_timestamp 
    AFTER UPDATE ON products
    BEGIN
        UPDATE products 
        SET updated_at = CURRENT_TIMESTAMP 
        WHERE id = NEW.id;
    END;
]])

-- 事务保存点
conn:execute("SAVEPOINT sp1")
-- 执行一些操作
conn:execute("RELEASE SAVEPOINT sp1")  -- 提交保存点
-- 或
conn:execute("ROLLBACK TO SAVEPOINT sp1")  -- 回滚到保存点

🐘 4. PostgreSQL 数据库访问

4.1 基本操作

local luasql = require "luasql.postgres"

-- 创建环境对象
local env = assert(luasql.postgres())

-- 连接数据库
local conn = assert(env:connect(
    "database_name",    -- 数据库名
    "username",         -- 用户名
    "password",         -- 密码
    "localhost",        -- 主机
    5432,              -- 端口
    nil,               -- Unix socket
    nil,               -- 连接超时
    "client_encoding=UTF8"  -- 连接选项
))

print("✅ PostgreSQL 连接成功!")

-- 创建表(使用 PostgreSQL 特定类型)
conn:execute([[
    CREATE TABLE IF NOT EXISTS employees (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(100) UNIQUE NOT NULL,
        salary DECIMAL(10, 2) CHECK(salary > 0),
        department VARCHAR(50),
        hire_date DATE DEFAULT CURRENT_DATE,
        is_active BOOLEAN DEFAULT TRUE,
        skills TEXT[],
        metadata JSONB
    )
]])

-- 创建索引
conn:execute("CREATE INDEX IF NOT EXISTS idx_department ON employees(department)")
conn:execute("CREATE INDEX IF NOT EXISTS idx_salary ON employees(salary DESC)")

-- 插入数据(使用参数化查询)
local insert_sql = [[
    INSERT INTO employees 
    (name, email, salary, department, skills, metadata) 
    VALUES ($1, $2, $3, $4, $5, $6)
]]

-- 注意:LuaSQL 原生不支持参数化查询,需要手动转义
local employees = {
    {"张三", "zhangsan@example.com", 15000.00, "技术部", 
     '{"Lua", "PostgreSQL", "Linux"}', '{"level": "高级", "projects": 5}'},
    {"李四", "lisi@example.com", 12000.50, "市场部", 
     '{"营销", "沟通"}', '{"level": "中级", "clients": 20}'},
    {"王五", "wangwu@example.com", 18000.75, "技术部", 
     '{"Python", "Docker", "Kubernetes"}', '{"level": "资深", "certifications": 3}'}
}

conn:execute("BEGIN")
for _, emp in ipairs(employees) do
    local sql = string.format([[
        INSERT INTO employees 
        (name, email, salary, department, skills, metadata) 
        VALUES ('%s', '%s', %f, '%s', '%s', '%s')
    ]], 
    conn:escape(emp[1]), conn:escape(emp[2]), emp[3], 
    conn:escape(emp[4]), conn:escape(emp[5]), conn:escape(emp[6]))
    conn:execute(sql)
end
conn:execute("COMMIT")

-- 查询数据
local cursor = conn:execute([[
    SELECT 
        id,
        name,
        email,
        TO_CHAR(salary, 'FM999,999,999.00') as formatted_salary,
        department,
        TO_CHAR(hire_date, 'YYYY-MM-DD') as hire_date_fmt,
        is_active,
        skills,
        metadata
    FROM employees
    WHERE is_active = TRUE
    ORDER BY salary DESC
]])

print("\n👥 员工列表:")
print("ID\t姓名\t邮箱\t\t\t薪资\t\t部门\t入职日期\t状态\t技能")
print(string.rep("-", 120))

local row = cursor:fetch({}, "a")
while row do
    local skills = row.skills and row.skails:gsub("[{}]", "") or ""
    print(string.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s",
        row.id, row.name, row.email, row.formatted_salary,
        row.department, row.hire_date_fmt,
        row.is_active and "在职" or "离职",
        skills))
    row = cursor:fetch(row, "a")
end
cursor:close()

-- JSON 查询
local cursor = conn:execute([[
    SELECT 
        name,
        metadata->>'level' as level,
        (metadata->>'projects')::int as projects
    FROM employees
    WHERE metadata->>'level' = '高级'
    OR metadata->>'level' = '资深'
]])

print("\n🏆 高级员工:")
row = cursor:fetch({}, "a")
while row do
    print(string.format("%s - 级别:%s 项目数:%d",
        row.name, row.level, row.projects or 0))
    row = cursor:fetch(row, "a")
end
cursor:close()

-- 数组查询
local cursor = conn:execute([[
    SELECT name, skills
    FROM employees
    WHERE 'Lua' = ANY(skills)
    OR 'Python' = ANY(skills)
]])

print("\n💻 技术员工:")
row = cursor:fetch({}, "a")
while row do
    local skills = row.skills:gsub("[{}]", ""):gsub('"', '')
    print(string.format("%s - 技能: %s", row.name, skills))
    row = cursor:fetch(row, "a")
end
cursor:close()

-- 关闭连接
conn:close()
env:close()

🏢 5. Oracle 数据库访问

5.1 安装和配置

# 安装 Oracle Instant Client
# 下载:https://www.oracle.com/database/technologies/instant-client.html

# Linux 安装示例
sudo apt-get install libaio1  # 依赖
sudo rpm -Uvh oracle-instantclient19.8-basic-19.8.0.0.0-1.x86_64.rpm
sudo rpm -Uvh oracle-instantclient19.8-devel-19.8.0.0.0-1.x86_64.rpm

# 设置环境变量
export ORACLE_HOME=/usr/lib/oracle/19.8/client64
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH
export PATH=$ORACLE_HOME/bin:$PATH

# 安装 LuaSQL Oracle 驱动
luarocks install luasql-oci8 OCI8_INCDIR=/usr/include/oracle/19.8/client64/

5.2 连接和操作

local luasql = require "luasql.oci8"

-- 创建环境对象
local env = assert(luasql.oci8())

-- 连接数据库
local conn = assert(env:connect(
    "//hostname:port/service_name",  -- 连接字符串
    "username",                      -- 用户名
    "password"                       -- 密码
))

print("✅ Oracle 连接成功!")

-- 创建表
conn:execute([[
    CREATE TABLE customers (
        customer_id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        customer_name VARCHAR2(100) NOT NULL,
        email VARCHAR2(100) UNIQUE,
        phone VARCHAR2(20),
        registration_date DATE DEFAULT SYSDATE,
        credit_limit NUMBER(10, 2) DEFAULT 10000.00,
        status VARCHAR2(10) DEFAULT 'ACTIVE'
    )
]])

-- 创建序列(如果需要)
conn:execute("CREATE SEQUENCE customer_seq START WITH 1 INCREMENT BY 1")

-- 插入数据
local customers = {
    {"张三", "zhangsan@example.com", "13800138000", 50000.00},
    {"李四", "lisi@example.com", "13900139000", 30000.00},
    {"王五", "wangwu@example.com", "13700137000", 80000.00}
}

conn:execute("BEGIN")
for _, cust in ipairs(customers) do
    local sql = string.format([[
        INSERT INTO customers 
        (customer_id, customer_name, email, phone, credit_limit) 
        VALUES (customer_seq.NEXTVAL, '%s', '%s', '%s', %f)
    ]], conn:escape(cust[1]), conn:escape(cust[2]), 
        conn:escape(cust[3]), cust[4])
    conn:execute(sql)
end
conn:execute("COMMIT")

-- 查询数据
local cursor = conn:execute([[
    SELECT 
        customer_id,
        customer_name,
        email,
        phone,
        TO_CHAR(registration_date, 'YYYY-MM-DD HH24:MI:SS') as reg_date,
        credit_limit,
        status
    FROM customers
    WHERE status = 'ACTIVE'
    ORDER BY credit_limit DESC
]])

print("\n👥 客户列表:")
print("ID\t姓名\t邮箱\t\t\t电话\t\t注册时间\t\t信用额度\t状态")
print(string.rep("-", 130))

local row = cursor:fetch({}, "a")
while row do
    print(string.format("%d\t%s\t%s\t%s\t%s\t%.2f\t%s",
        row.customer_id, row.customer_name, row.email, 
        row.phone, row.reg_date, row.credit_limit, row.status))
    row = cursor:fetch(row, "a")
end
cursor:close()

-- 存储过程调用
conn:execute([[
    CREATE OR REPLACE PROCEDURE update_credit_limit(
        p_customer_id IN NUMBER,
        p_new_limit IN NUMBER,
        p_result OUT VARCHAR2
    ) AS
    BEGIN
        UPDATE customers 
        SET credit_limit = p_new_limit
        WHERE customer_id = p_customer_id;
        
        IF SQL%ROWCOUNT = 0 THEN
            p_result := '客户不存在';
        ELSE
            p_result := '更新成功';
        END IF;
        
        COMMIT;
    END;
]])

-- 调用存储过程(LuaSQL 直接调用存储过程有限制,通常使用匿名块)
local update_sql = string.format([[
    DECLARE
        v_result VARCHAR2(100);
    BEGIN
        update_credit_limit(%d, %f, v_result);
        DBMS_OUTPUT.PUT_LINE(v_result);
    END;
]], 1, 60000.00)

conn:execute(update_sql)

-- 关闭连接
conn:close()
env:close()

🔄 6. 数据库连接池

6.1 简单连接池实现

local luasql = require "luasql.mysql"

local ConnectionPool = {}
ConnectionPool.__index = ConnectionPool

function ConnectionPool.new(config)
    local self = setmetatable({}, ConnectionPool)
    self.config = config
    self.pool = {}
    self.active_connections = 0
    self.max_connections = config.max_connections or 10
    self.timeout = config.timeout or 5000  -- 5秒超时
    self.env = luasql.mysql()
    return self
end

function ConnectionPool:get_connection()
    -- 从池中获取可用连接
    for i, conn in ipairs(self.pool) do
        if self:_is_connection_valid(conn) then
            table.remove(self.pool, i)
            self.active_connections = self.active_connections + 1
            return conn
        end
    end
    
    -- 创建新连接
    if self.active_connections < self.max_connections then
        local conn = self.env:connect(
            self.config.database,
            self.config.username,
            self.config.password,
            self.config.host,
            self.config.port
        )
        
        if conn then
            self.active_connections = self.active_connections + 1
            return conn
        end
    end
    
    return nil, "无法获取数据库连接"
end

function ConnectionPool:release_connection(conn)
    if self:_is_connection_valid(conn) then
        table.insert(self.pool, conn)
    else
        conn:close()
    end
    self.active_connections = self.active_connections - 1
end

function ConnectionPool:_is_connection_valid(conn)
    -- 简单的心跳检查
    local ok, err = pcall(function()
        local cursor = conn:execute("SELECT 1")
        if cursor then cursor:close() end
        return true
    end)
    return ok
end

function ConnectionPool:close_all()
    for _, conn in ipairs(self.pool) do
        conn:close()
    end
    self.pool = {}
    self.active_connections = 0
    self.env:close()
end

-- 使用连接池
local pool_config = {
    host = "localhost",
    port = 3306,
    database = "test_db",
    username = "root",
    password = "password",
    max_connections = 5
}

local pool = ConnectionPool.new(pool_config)

-- 获取连接执行查询
local conn, err = pool:get_connection()
if conn then
    local cursor = conn:execute("SELECT * FROM users LIMIT 10")
    -- 处理结果...
    cursor:close()
    pool:release_connection(conn)
else
    print("获取连接失败:", err)
end

-- 使用完成后关闭所有连接
pool:close_all()

🛡️ 7. 安全最佳实践

7.1 防止 SQL 注入

local function safe_execute(conn, sql, ...)
    local args = {...}
    
    -- 转义所有参数
    for i, arg in ipairs(args) do
        if type(arg) == "string" then
            args[i] = conn:escape(arg)
        end
    end
    
    -- 构建安全 SQL
    local safe_sql = string.format(sql, unpack(args))
    return conn:execute(safe_sql)
end

-- 使用示例
local user_input = "张三'; DROP TABLE users; --"
local sql = "SELECT * FROM users WHERE name = '%s'"

-- 安全执行
local cursor = safe_execute(conn, sql, user_input)

-- 参数化查询(如果数据库驱动支持)
local function parameterized_query(conn, sql, params)
    -- 这里需要根据具体数据库驱动实现
    -- 例如使用预处理语句
    return conn:prepare(sql):execute(params)
end

7.2 连接安全

-- 使用 SSL 连接(如果数据库支持)
local ssl_config = {
    ssl = true,
    ssl_ca = "/path/to/ca-cert.pem",
    ssl_cert = "/path/to/client-cert.pem",
    ssl_key = "/path/to/client-key.pem"
}

-- 加密密码存储
local function encrypt_password(password, salt)
    -- 使用 bcrypt 或类似算法
    -- 这里简化示例
    return password  -- 实际应用中应该使用加密
end

-- 连接超时和重试
local function connect_with_retry(env, config, max_retries)
    local retries = 0
    while retries < max_retries do
        local conn, err = env:connect(
            config.database,
            config.username,
            config.password,
            config.host,
            config.port
        )
        
        if conn then
            return conn
        end
        
        retries = retries + 1
        print(string.format("连接失败,第 %d 次重试...", retries))
        os.execute("sleep " .. (2 ^ retries))  -- 指数退避
    end
    
    return nil, "连接失败,达到最大重试次数"
end

📈 8. 性能优化

8.1 批量操作

-- 批量插入优化
local function batch_insert(conn, table_name, data, batch_size)
    batch_size = batch_size or 1000
    
    conn:execute("START TRANSACTION")
    
    local values = {}
    for i, row in ipairs(data) do
        local escaped_values = {}
        for _, value in ipairs(row) do
            if type(value) == "string" then
                table.insert(escaped_values, "'" .. conn:escape(value) .. "'")
            else
                table.insert(escaped_values, tostring(value))
            end
        end
        
        table.insert(values, "(" .. table.concat(escaped_values, ",") .. ")")
        
        -- 分批提交
        if i % batch_size == 0 then
            local sql = string.format(
                "INSERT INTO %s VALUES %s",
                table_name,
                table.concat(values, ",")
            )
            conn:execute(sql)
            values = {}
        end
    end
    
    -- 插入剩余数据
    if #values > 0 then
        local sql = string.format(
            "INSERT INTO %s VALUES %s",
            table_name,
            table.concat(values, ",")
        )
        conn:execute(sql)
    end
    
    conn:execute("COMMIT")
end

-- 使用批量插入
local large_data = {}
for i = 1, 10000 do
    table.insert(large_data, {
        "用户" .. i,
        "user" .. i .. "@example.com",
        math.random(20, 60),
        os.time() + i
    })
end

batch_insert(conn, "users", large_data, 1000)

8.2 查询优化

-- 使用索引提示
local function query_with_index(conn, table_name, index_name, conditions)
    local sql = string.format([[
        SELECT /*+ INDEX(%s %s) */ * 
        FROM %s 
        WHERE %s
    ]], table_name, index_name, table_name, conditions)
    
    return conn:execute(sql)
end

-- 分页查询
local function paginated_query(conn, sql, page, page_size)
    local offset = (page - 1) * page_size
    local paginated_sql = string.format("%s LIMIT %d OFFSET %d", 
        sql, page_size, offset)
    
    return conn:execute(paginated_sql)
end

-- 查询缓存
local query_cache = {}

local function cached_query(conn, sql, ttl)
    ttl = ttl or 300  -- 默认5分钟
    
    local cache_key = sql
    local cached = query_cache[cache_key]
    
    if cached and os.time() - cached.timestamp < ttl then
        return cached.result
    end
    
    local cursor = conn:execute(sql)
    local result = {}
    local row = cursor:fetch({}, "a")
    while row do
        table.insert(result, row)
        row = cursor:fetch(row, "a")
    end
    cursor:close()
    
    query_cache[cache_key] = {
        result = result,
        timestamp = os.time()
    }
    
    return result
end

🧪 9. 测试和调试

9.1 单元测试

local busted = require "busted"

describe("数据库测试", function()
    local conn
    
    setup(function()
        -- 测试前建立连接
        local env = luasql.mysql()
        conn = env:connect("test_db", "test_user", "test_pass", "localhost", 3306)
        conn:execute("CREATE TABLE IF NOT EXISTS test_users (id INT, name VARCHAR(50))")
    end)
    
    teardown(function()
        -- 测试后清理
        if conn then
            conn:execute("DROP TABLE IF EXISTS test_users")
            conn:close()
        end
    end)
    
    it("应该能插入数据", function()
        local sql = "INSERT INTO test_users VALUES (1, '测试用户')"
        local result = conn:execute(sql)
        assert.is_true(result ~= nil)
    end)
    
    it("应该能查询数据", function()
        local cursor = conn:execute("SELECT * FROM test_users WHERE id = 1")
        assert.is_not_nil(cursor)
        
        local row = cursor:fetch({}, "a")
        assert.are.equal("测试用户", row.name)
        
        cursor:close()
    end)
    
    it("应该能更新数据", function()
        local sql = "UPDATE test_users SET name = '更新用户' WHERE id = 1"
        local affected = conn:execute(sql)
        assert.are.equal(1, affected)
    end)
    
    it("应该能删除数据", function()
        local sql = "DELETE FROM test_users WHERE id = 1"
        local affected = conn:execute(sql)
        assert.are.equal(1, affected)
    end)
end)

9.2 性能测试

local function benchmark_query(conn, sql, iterations)
    local total_time = 0
    
    for i = 1, iterations do
        local start_time = os.clock()
        local cursor = conn:execute(sql)
        
        if cursor then
            cursor:close()
        end
        
        local end_time = os.clock()
        total_time = total_time + (end_time - start_time)
    end
    
    local avg_time = total_time / iterations
    local qps = 1 / avg_time
    
    print(string.format("查询: %s", sql))
    print(string.format("迭代次数: %d", iterations))
    print(string.format("总时间: %.4f 秒", total_time))
    print(string.format("平均时间: %.4f 秒", avg_time))
    print(string.format("QPS: %.2f", qps))
    
    return avg_time, qps
end

-- 运行性能测试
benchmark_query(conn, "SELECT * FROM users WHERE age > 20", 1000)
benchmark_query(conn, "SELECT * FROM users WHERE name LIKE '%张%'", 100)

🔧 10. 错误处理

10.1 统一错误处理

local Database = {}
Database.__index = Database

function Database.new(config)
    local self = setmetatable({}, Database)
    self.config = config
    self.env = nil
    self.conn = nil
    return self
end

function Database:connect()
    local ok, env_or_err = pcall(function()
        if self.config.driver == "mysql" then
            return require("luasql.mysql")
        elseif self.config.driver == "sqlite3" then
            return require("luasql.sqlite3")
        elseif self.config.driver == "postgres" then
            return require("luasql.postgres")
        else
            error("不支持的数据库驱动: " .. self.config.driver)
        end
    end)
    
    if not ok then
        return false, "加载数据库驱动失败: " .. env_or_err
    end
    
    self.env = env_or_err[self.config.driver]()
    if not self.env then
        return false, "创建数据库环境失败"
    end
    
    local conn, err = self.env:connect(
        self.config.database,
        self.config.username,
        self.config.password,
        self.config.host,
        self.config.port
    )
    
    if not conn then
        return false, "连接数据库失败: " .. (err or "未知错误")
    end
    
    self.conn = conn
    return true
end

function Database:query(sql, params)
    if not self.conn then
        return false, "数据库未连接"
    end
    
    local ok, cursor_or_err = pcall(function()
        -- 参数处理
        if params then
            for i, param in ipairs(params) do
                if type(param) == "string" then
                    params[i] = self.conn:escape(param)
                end
            end
            sql = string.format(sql, unpack(params))
        end
        
        return self.conn:execute(sql)
    end)
    
    if not ok then
        return false, "执行查询失败: " .. cursor_or_err
    end
    
    if not cursor_or_err then
        return false, "查询返回空结果"
    end
    
    -- 将游标转换为结果集
    local results = {}
    local row = cursor_or_err:fetch({}, "a")
    while row do
        table.insert(results, row)
        row = cursor_or_err:fetch(row, "a")
    end
    
    cursor_or_err:close()
    return true, results
end

function Database:execute(sql, params)
    if not self.conn then
        return false, "数据库未连接"
    end
    
    local ok, result_or_err = pcall(function()
        if params then
            for i, param in ipairs(params) do
                if type(param) == "string" then
                    params[i] = self.conn:escape(param)
                end
            end
            sql = string.format(sql, unpack(params))
        end
        
        return self.conn:execute(sql)
    end)
    
    if not ok then
        return false, "执行失败: " .. result_or_err
    end
    
    return true, result_or_err
end

function Database:close()
    if self.conn then
        self.conn:close()
        self.conn = nil
    end
    
    if self.env then
        self.env:close()
        self.env = nil
    end
    
    return true
end

-- 使用示例
local db = Database.new({
    driver = "mysql",
    host = "localhost",
    port = 3306,
    database = "test_db",
    username = "root",
    password = "password"
})

local ok, err = db:connect()
if not ok then
    print("连接失败:", err)
    return
end

-- 执行查询
local ok, results = db:query("SELECT * FROM users WHERE age > ?", {18})
if ok then
    for _, user in ipairs(results) do
        print(user.name, user.email)
    end
else
    print("查询失败:", results)  -- 这里 results 是错误信息
end

-- 执行更新
local ok, affected = db:execute("UPDATE users SET status = ? WHERE id = ?", {"active", 1})
if ok then
    print("更新成功,影响行数:", affected)
else
    print("更新失败:", affected)
end

db:close()

📊 11. 监控和日志

11.1 查询日志

local QueryLogger = {}
QueryLogger.__index = QueryLogger

function QueryLogger.new()
    local self = setmetatable({}, QueryLogger)
    self.queries = {}
    self.enabled = true
    return self
end

function QueryLogger:log_query(sql, params, execution_time, success, error_msg)
    if not self.enabled then return end
    
    local log_entry = {
        timestamp = os.date("%Y-%m-%d %H:%M:%S"),
        sql = sql,
        params = params,
        execution_time = execution_time,
        success = success,
        error = error_msg,
        backtrace = debug.traceback()
    }
    
    table.insert(self.queries, log_entry)
    
    -- 限制日志大小
    if #self.queries > 1000 then
        table.remove(self.queries, 1)
    end
    
    -- 输出到控制台
    local status = success and "✅" or "❌"
    print(string.format("[%s] %s %s (%.4fs)", 
        log_entry.timestamp, status, sql, execution_time))
    
    if not success and error_msg then
        print("错误:", error_msg)
    end
end

function QueryLogger:get_statistics()
    local stats = {
        total_queries = #self.queries,
        successful_queries = 0,
        failed_queries = 0,
        total_execution_time = 0,
        avg_execution_time = 0,
        slow_queries = {}
    }
    
    for _, query in ipairs(self.queries) do
        if query.success then
            stats.successful_queries = stats.successful_queries + 1
        else
            stats.failed_queries = stats.failed_queries + 1
        end
        
        stats.total_execution_time = stats.total_execution_time + query.execution_time
        
        -- 记录慢查询(超过100ms)
        if query.execution_time > 0.1 then
            table.insert(stats.slow_queries, query)
        end
    end
    
    if stats.total_queries > 0 then
        stats.avg_execution_time = stats.total_execution_time / stats.total_queries
    end
    
    return stats
end

function QueryLogger:export_logs(filename)
    local file = io.open(filename, "w")
    if not file then return false end
    
    file:write("时间戳,SQL,参数,执行时间(秒),成功,错误\n")
    
    for _, query in ipairs(self.queries) do
        local params_str = query.params and table.concat(query.params, "|") or ""
        local error_str = query.error or ""
        
        file:write(string.format('"%s","%s","%s",%.6f,%s,"%s"\n',
            query.timestamp,
            query.sql:gsub('"', '""'),
            params_str:gsub('"', '""'),
            query.execution_time,
            query.success and "true" or "false",
            error_str:gsub('"', '""')
        ))
    end
    
    file:close()
    return true
end

-- 使用日志记录器
local logger = QueryLogger.new()

-- 包装数据库操作
local function logged_query(conn, sql, params)
    local start_time = os.clock()
    local success, result_or_error = pcall(function()
        if params then
            for i, param in ipairs(params) do
                if type(param) == "string" then
                    params[i] = conn:escape(param)
                end
            end
            sql = string.format(sql, unpack(params))
        end
        return conn:execute(sql)
    end)
    
    local execution_time = os.clock() - start_time
    
    logger:log_query(sql, params, execution_time, success, 
        success and nil or result_or_error)
    
    if success then
        return result_or_error
    else
        error(result_or_error)
    end
end

-- 定期输出统计信息
local function monitor_performance(logger, interval)
    while true do
        os.execute("sleep " .. interval)
        local stats = logger:get_statistics()
        
        print("\n📊 数据库性能统计:")
        print("总查询数:", stats.total_queries)
        print("成功查询:", stats.successful_queries)
        print("失败查询:", stats.failed_queries)
        print("平均执行时间:", string.format("%.4f秒", stats.avg_execution_time))
        print("慢查询数:", #stats.slow_queries)
        
        if #stats.slow_queries > 0 then
            print("\n🐌 慢查询列表:")

❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄

💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍

🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐