灵犀 IM - 从零构建 Go IM 系统:WebSocket + AI Agent + gRPC 全栈实践

关键词: Go / Gin / WebSocket / GORM / Redis / gRPC / go-zero / AI Function Calling / Docker


一、项目背景与整体思路

我决定做一个真实可用的 IM(即时通讯)系统,而不是仅仅 CRUD 教程。目标是:

  • 覆盖 WebSocket 长连接、离线消息、群聊等 IM 核心场景

  • 集成 AI 对话能力(@AI 指令 + Function Calling 工具调用)

  • 管理后台通过 gRPC 与业务服务通信,支持 etcd 服务发现

  • 前后端分离,Docker 一键部署

最终项目实现了以下完整功能链路:

Vue3 前端
   ↕ HTTP REST (Gin)    ← JWT 鉴权、限流、日志脱敏
   ↕ WebSocket          ← 实时消息、心跳、ACK、离线推送
   ↕ SSE 流式           ← AI 对话逐 token 输出
   ↕ gRPC (go-zero)     ← 管理后台服务调用

数据层:MySQL(GORM) + Redis(缓存 / 会话)


二、技术栈总览

分层 技术选型 说明
Web 框架 Gin v1.12 路由、中间件、参数绑定
实时通信 gorilla/websocket 长连接管理
ORM GORM v1.31 + MySQL Driver 数据持久化
缓存 go-redis/v9 离线消息辅助、会话缓存
认证 golang-jwt/jwt v5 JWT 无状态鉴权
密码安全 golang.org/x/crypto (bcrypt) 密码哈希
微服务框架 go-zero v1.10 gRPC 服务注册/发现 (etcd)
日志 go.uber.org/zap 结构化日志
限流 golang.org/x/time/rate 令牌桶
AI 调用 自封装 HTTP 客户端 兼容 OpenAI 格式,支持主备切换
容器化 Docker 多阶段构建 + Docker Compose 镜像最小化
配置 spf13/viper yaml 配置热加载

三、项目架构:分层 + 依赖注入

3.1 目录结构

go-zero-im/
├── cmd/server/main.go       # 程序入口,统一创建服务实例
├── config/                  # 配置结构体 + viper 读取
├── internal/
│   ├── handler/             # HTTP Handler(Gin 控制器)
│   ├── service/             # 业务逻辑层(interface + 实现)
│   ├── repository/          # 数据访问层(GORM 封装)
│   ├── model/               # GORM 数据模型
│   ├── middleware/           # JWT、限流、日志、CORS、Recovery
│   ├── router/              # 路由注册(用户侧 8080 + 管理侧 9090)
│   └── ws/                  # WebSocket(Hub + Client + Protocol)
├── rpc/
│   ├── proto/               # Protobuf 定义
│   ├── pb/                  # 生成的 Go 代码
│   └── server/              # gRPC Server 实现(复用 service 层)
└── pkg/
    ├── ai/                  # AI HTTP 客户端(非流式/流式/工具调用)
    ├── prompts/             # 系统提示词(从文件加载)
    ├── news/weather/route/  # AI 工具实现(获取新闻/天气/导航)
    └── utils/               # JWT 工具、响应封装

3.2 依赖注入:单点创建,多处共享

这里踩过一个经典坑:aiSvcrouter.gomain.go 里各创建了一次,导致 WebSocket @AI 和 HTTP /ai/chat/stream 的上下文完全隔离——用 WS 聊的内容,HTTP 接口完全不知道。

解决方案是严格在 main.go 统一创建所有服务实例,router.Register 只接收已创建好的服务:

// main.go:所有 service 只创建一次
userSvc    := service.NewUserService(userRepo)
messageSvc := service.NewMessageService(msgRepo)
groupSvc   := service.NewGroupService(groupRepo)
aiSvc      := service.NewAIService(cfg.AI, aiContextRepo)
adminSvc   := service.NewAdminService(adminRepo, groupRepo, msgRepo)
​
hub := ws.NewHub(messageSvc, groupSvc, aiSvc)
go hub.Run()
​
// 所有 handler 和 hub 共享同一套 service
router.Register(r, log, cfg, hub, userSvc, messageSvc, groupSvc, aiSvc, adminSvc)

四、WebSocket 核心:Hub/Client 模型

4.1 Hub:在线连接注册表

Hub 是整个 WebSocket 层的核心,维护一张 username → *Client 的映射表,通过 channel 驱动事件循环,避免直接对 map 加锁。

type Hub struct {
    mu         sync.RWMutex        // 保护 clients map
    clients    map[string]*Client  // key = 用户名
    register   chan *Client
    unregister chan *Client
    messageSvc service.MessageService
    groupSvc   service.GroupService
    aiSvc      service.AIService
}

注册/注销操作全部通过 channel 发到事件循环里串行处理,查询(SendToUser)使用读锁,做到读写分离:

func (h *Hub) Run() {
    for {
        select {
        case c := <-h.register:
            h.mu.Lock()
            // 同一用户二次登录时,关闭旧连接(踢下线)
            if old, ok := h.clients[c.username]; ok {
                close(old.send)
            }
            h.clients[c.username] = c
            h.mu.Unlock()
​
        case c := <-h.unregister:
            h.mu.Lock()
            // 防止新连接上线后旧连接的 unregister 误删新连接
            if current, ok := h.clients[c.username]; ok && current == c {
                delete(h.clients, c.username)
                close(c.send)
            }
            h.mu.Unlock()
        }
    }
}

4.2 Client:读写分离的双协程模型

每个 WebSocket 连接对应一个 Client,启动两个 goroutine:

readPump goroutine  ← 读客户端发来的消息,处理业务逻辑
writePump goroutine ← 消费 send channel,把消息写回客户端

心跳保活:writePump 定时发 Ping 帧(pingPeriod = pongWait * 9/10),readPump 的 PongHandler 收到后续期读超时,确保长时间无消息的连接不被服务端或中间代理强制断开。

ticker := time.NewTicker(pingPeriod) // 约 54 秒
// ...
case <-ticker.C:
    c.conn.SetWriteDeadline(time.Now().Add(writeWait))
    c.conn.WriteMessage(websocket.PingMessage, nil)

ACK 机制:每条 TypeChat 消息落库后,服务端回 TypeAck,区分 delivered(对方在线已推)和 stored(对方离线已存库),前端可据此渲染消息状态。

4.3 消息协议设计

type Message struct {
    Type      int    `json:"type"`                // 1心跳 2单聊 3ACK 4群聊 5系统通知
    MsgID     string `json:"msg_id"`              // 客户端生成的全局唯一ID(幂等去重)
    From      string `json:"from"`                // 服务端强制写入,防前端伪造
    To        string `json:"to"`                  // 目标用户名(单聊)或群ID字符串(群聊)
    Content   string `json:"content"`
    Timestamp int64  `json:"timestamp"`
    Status    string `json:"status,omitempty"`    // ACK状态
    ChatType  string `json:"chat_type,omitempty"` // "private" / "group"
    ConvPeer  string `json:"conv_peer,omitempty"` // AI 回复时告知前端消息归属哪个会话
}

From 字段由服务端在 readPump 中强制覆盖为已认证的 c.username,客户端无法伪造发送方。


五、消息存储与离线推送

5.1 写入流程(先落库再投递)

客户端发消息
    ↓
readPump 接收 JSON
    ↓
messageSvc.SavePrivateMessage()  // 先写 MySQL
    ↓
hub.SendToUser(to, rawMsg)       // 尝试投递
    ├── 在线 → 推送到对方 send channel → MarkDelivered
    └── 离线 → 消息已存库,status=stored → 等待下次上线拉取
    ↓
给发送方回 ACK(delivered / stored)

5.2 上线时拉取离线消息

连接建立(ws.Serve 函数)时,立即拉取两类离线消息:

单聊离线消息:查询 status = 'stored'to = username 的消息,推送后批量标记为 delivered

群聊离线消息:这里采用了 per-user 读取位点 方案,每个群成员的 group_members 表记录 last_read_at(上次断线时间点):

-- 查询某用户在所有群的离线消息
SELECT m.* FROM messages m
JOIN group_members gm ON m.`to` = gm.group_id
    AND gm.username = ?
    AND m.created_at > COALESCE(gm.last_read_at, gm.joined_at)
WHERE m.`to` IN (?) AND m.chat_type = 'group'
ORDER BY m.created_at ASC

断线时 readPump 的 defer 更新 last_read_at

defer func() {
    myGroups, err := c.hub.groupSvc.GetMyGroups(c.username)
    if err == nil {
        ids := // 提取 groupID 列表
        _ = c.hub.groupSvc.UpdateLastReadAt(c.username, ids)
    }
    c.hub.unregister <- c
    c.conn.Close()
}()

这套方案彻底解决了群消息重复推送问题(之前依赖共享 status 字段,多人群聊时会漏推或重复推)。

5.3 会话列表:统一排序

会话列表同时包含单聊和群聊,两种消息分别查询后 append 到同一切片,最后统一按 last_time 倒序:

// 修复前:单聊有序,但 append 群聊后打乱顺序
// 修复后:
sort.Slice(conversations, func(i, j int) bool {
    return conversations[i].LastTime > conversations[j].LastTime
})

六、AI 集成:从简单对话到 Agentic 工具调用

这是整个项目里技术含量最高的部分,分三个层次递进。

6.1 @AI 指令:WebSocket 内嵌 AI 对话

在聊天窗口发送 @AI 你好 时,readPump 拦截处理:

if len(msg.Content) >= 3 && msg.Content[:3] == "@AI" {
    question := strings.TrimPrefix(msg.Content[3:], " ")
    if question == "" { continue } // 防空问题触发 AI
    // ...
}

群聊 @AI 的完整流程(顺序很关键):

1. 先把用户的 @AI 消息落库(保证历史完整)
2. 提前查群成员列表(后面两次广播复用,减少 DB 查询)
3. 立即把用户消息广播给其他群成员(不等 AI 返回,体验更流畅)
4. 拉取近 30 条群聊记录,拼入 AI 提问上下文
5. 调用 AI(主备切换)
6. 把 AI 回复存库并广播给所有在线成员

AI 回复内容带上提问者信息,群历史里上下文清晰:

[jack 向AI提问] 什么是 goroutine?

goroutine 是 Go 语言的轻量级线程...

6.2 上下文管理:滚动摘要压缩

每个用户独立维护对话上下文,超出阈值时异步压缩为摘要,保证历史不丢失的同时避免 token 无限增长:

type userContext struct {
    summary string           // 历史摘要(已压缩部分)
    recent  []ai.ChatMessage // 最近未压缩的对话
}

构建发送给 AI 的消息列表

messages = append(messages, ai.ChatMessage{
    Role:    "system",
    // 主人设 + 历史摘要合并为一条 system 消息
    // 拆成两条 system 消息某些提供商会返回空回复
    Content: prompts.WithChatSummary(prompts.ChatSystem, uc.summary),
})
messages = append(messages, uc.recent...) // 近期原始对话

异步压缩(不阻塞当前回复):

if needSummarize && reply != "" {
    go func() {
        newSummary, err := s.summarize(oldSummary, toSummarize)
        // 只清除已被总结的消息,保留压缩期间并发进来的新消息
        s.mu.Lock()
        uc.summary = newSummary
        uc.recent = uc.recent[removedCount:]
        s.mu.Unlock()
        s.persistContext(username) // 异步落库,进程重启后可恢复
    }()
}

上下文还支持持久化到 MySQL(ai_context 表存 summary + recent_json),进程重启后首次访问时按需从 DB 恢复,无需重新建立对话。

6.3 主备 AI 切换

封装了一个简单但实用的主备切换:

func (s *aiService) callAI(messages []ai.ChatMessage) (string, error) {
    reply, err := ai.Call(s.primaryCfg, messages, 60)
    if err != nil {
        log.Printf("primary ai failed: %v, switching to backup", err)
        reply, err = ai.Call(s.backupCfg, messages, 60)
    }
    return reply, err
}

配置文件里分别配置主力(如 DeepSeek)和备用(如 Qwen)模型,无缝切换,上下文不丢失。

6.4 Function Calling:Agentic 工具调用

这是项目最有意思的部分。用户可以用自然语言操作 IM 功能:

"帮我建一个叫'Go学习小组'的群,把 alice、bob、carol 都加进去"

系统会自动执行多轮工具调用:

轮次1:AI 决策 → create_group("Go学习小组")
        → 执行,得到 group_id = 42
轮次2:AI → invite_members(group_id=42, ["alice","bob","carol"])
        → 执行邀请,结果追加到消息链
轮次3:AI 不再调用工具 → 退出循环
最终:流式生成自然语言回复,每个 chunk 通过 SSE 推给前端

工具执行器采用闭包注入当前用户名和业务服务:

func (h *AIHandler) buildUserToolExecutor(username string) func(name string, args map[string]interface{}) (string, error) {
    return func(name string, args map[string]interface{}) (string, error) {
        switch name {
        case "create_group":
            name, _ := args["name"].(string)
            group, err := h.groupSvc.CreateGroup(name, username, 200)
            // ...
        case "invite_members":
            // 解析 group_id,支持 group_name 模糊匹配兜底
            groupID, err := resolveGroupID(args)
            // ...
        case "get_weather":
            city, _ := args["city"].(string)
            info, err := weather.FetchWeather(city)
            return weather.Format(info), err
        // ... 更多工具
        }
    }
}

SSE 事件流:前端能实时看到工具执行过程:

// AI 决定调工具
{"type":"tool_call","name":"create_group","args":"{\"name\":\"Go学习小组\"}"}
// 工具执行完成
{"type":"tool_result","name":"create_group","result":"群聊创建成功,ID=42","success":true}
// 最终回复的文字片段
{"type":"chunk","content":"好的,我已经帮你..."}
// 全部完成
{"type":"done","content":""}

工具对话历史同样支持滚动摘要压缩和持久化,type="tool" 的消息在摘要时被格式化为可读文本。

内置工具清单

工具名 功能
search_user 搜索用户(关键词)
get_my_groups 查询我加入的群
search_group 按名称搜索群聊
create_group 创建群聊
join_group 加入群聊
leave_group 退出群聊(支持 group_name 模糊匹配)
invite_member/members 邀请(批量)成员
kick_member/members 踢(批量)成员
disband_group 解散群聊
get_contacts 获取联系人列表
add_contact 添加联系人
delete_contact 删除联系人
get_news 获取最新新闻(分类)
get_weather 查询城市天气
get_route 查询两地路线

七、中间件:安全与可观测性

7.1 JWT 鉴权

func AuthJWT(secret string) gin.HandlerFunc {
    return func(c *gin.Context) {
        auth := c.GetHeader("Authorization")
        // Bearer <token>
        claims, err := utils.ParseToken(secret, parts[1])
        if err != nil {
            response.Fail(c, 401, 401, "invalid token")
            c.Abort()
            return
        }
        c.Set("username", claims.Username)
        c.Set("role", claims.Role) // 后续 handler 可做角色鉴权
        c.Next()
    }
}

WebSocket 升级时也校验 JWT,连接建立后绑定到 client.username,后续收到的消息 From 字段均由服务端填写,无法伪造。

7.2 结构化请求日志 + 敏感字段脱敏

请求进入时记录 method / path / body / IP,请求结束后记录状态码、耗时、响应大小。5xx 用 Error 级别,4xx 用 Warn,2xx/3xx 用 Info。

核心是密码脱敏——正则替换 JSON body 中的 password、secret、api_key、token 字段值:

var sensitivePatterns = []*regexp.Regexp{
    regexp.MustCompile(`("password"\s*:\s*)"[^"]*"`),
    regexp.MustCompile(`("secret"\s*:\s*)"[^"]*"`),
    regexp.MustCompile(`("api_key"\s*:\s*)"[^"]*"`),
    regexp.MustCompile(`("token"\s*:\s*)"[^"]*"`),
}

func maskSensitive(body string) string {
    for _, p := range sensitivePatterns {
        body = p.ReplaceAllString(body, `${1}"***"`)
    }
    return body
}

这个正则在最初被注释掉了,导致用户登录密码明文写入日志——这是真实修复过的安全 bug。

7.3 令牌桶限流

func RateLimit(qps int, burst int) gin.HandlerFunc {
    limiter := rate.NewLimiter(rate.Limit(qps), burst)
    return func(c *gin.Context) {
        if !limiter.Allow() {
            response.Fail(c, 429, 429, "too many requests")
            c.Abort()
            return
        }
        c.Next()
    }
}
// 全局:200 QPS,允许 50 突发
r.Use(middleware.RateLimit(200, 50))

八、群聊的安全设计

群聊部分有几处容易被忽略的安全细节:

1. 发消息前校验群成员身份:readPump 处理 TypeGroupChat 时,查出群成员列表后校验 c.username 是否在其中,不在则返回 ACK failed 并拒绝广播。

2. 查看群历史需校验成员身份:HTTP 接口 GET /api/v1/message/history?group_id=xxx 需要先确认当前用户是该群成员,否则任意用户可查任意群历史。

3. 查看群成员需校验GET /api/v1/group/members?group_id=xxx 同样验证当前用户是否在群里,防止非成员探测群成员列表。


九、gRPC + go-zero:管理后台服务

管理后台(端口 9090)使用 go-zero 框架,通过 gRPC 暴露管理接口(用户管理、群组管理、消息管理),关键是 复用了业务层的 service,而非重新连接 DB:

// GroupRpcServer 复用现有 AdminService,不新建 DB 连接
type GroupRpcServer struct {
    pb.UnimplementedGroupRpcServer
    adminSvc service.AdminService
}

func (s *GroupRpcServer) ListGroups(ctx context.Context, req *pb.ListGroupsReq) (*pb.ListGroupsResp, error) {
    groups, total, err := s.adminSvc.ListGroups(int(req.Page), int(req.PageSize), req.Keyword)
    // ...
}

Protobuf 统一定义通用响应:

message CommonResp {
    int32 code = 1;
    string msg  = 2;
}

配置文件支持两种模式:

  • monolith:单体部署,gRPC Server 和 HTTP Server 同进程,直连调用

  • microservice:独立部署,通过 etcd 服务注册/发现


十、Docker 部署:多阶段构建最小化镜像

# Stage 1:编译(golang:1.26-alpine)
FROM golang:1.26-alpine AS builder
ENV GOPROXY=https://goproxy.cn,direct
# 先复制 go.mod/go.sum 利用构建缓存,依赖不变时不重新下载
COPY go.mod go.sum ./
RUN go mod download
COPY . .
# 关闭 CGO,静态链接,-ldflags="-s -w" 去掉调试信息,缩小体积
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
    go build -ldflags="-s -w" -o server ./cmd/server/main.go

# Stage 2:最小运行镜像(alpine:3.20)
FROM alpine:3.20
RUN apk add --no-cache tzdata ca-certificates && \
    cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
COPY --from=builder /build/server ./server
COPY --from=builder /build/config ./config
COPY --from=builder /build/pkg/prompts/data ./pkg/prompts/data
EXPOSE 8080 9090
CMD ["./server", "-c", "config/config.yaml"]

docker-compose 结构:

services:
  backend:
    image: im_backend:latest
    extra_hosts:
      - "host.docker.internal:host-gateway"  # Linux 访问宿主机 MySQL/Redis 必须
    expose: ["8080", "9090"]

  frontend:
    image: im_frontend:latest  # Nginx + Vue 静态文件
    ports: ["3001:80"]
    depends_on: [backend]

MySQL 和 Redis 以独立容器运行在宿主机,通过 host.docker.internal 访问,方便本地开发和生产复用同一份数据。


十一、踩坑记录:13 个真实修复的 Bug

这部分记录了项目开发中遇到的真实问题,每一个都代表一类典型错误:

# 文件 问题 根因/修复
1 middleware/logger.go 登录密码明文写入日志 脱敏正则被注释掉,已恢复
2 router.go + main.go @AI 与 HTTP AI 对话上下文隔离 aiSvc 重复创建,改为 main.go 统一创建
3 handler/group.go "joinned" 拼写错误 改为 "joined"
4 handler/group.go ParseUint bit size 传了 60 改为 64
5 ws/client.go 群聊 @AI 回复不入库 单聊有 SaveMessage,群聊遗漏,已补
6 ws/client.go @AI 发送空问题仍调 AI len > 3 不足够,加空内容守卫
7 message_repo.go 单聊会话列表乱序 ORDER BY last_time DESC,已补
8 handler/message.go 群聊历史无成员校验 任意用户可查任意群,已加成员身份验证
9 handler/group.go 查群成员无鉴权 任意用户可探测任意群成员,已修复
10 ws/client.go 群聊 @AI 回复缺少提问者上下文 补齐 [user 向AI提问] 内容 格式
11 message_repo.go 会话列表混合单群聊后乱序 append 后未统一排序,加 sort.Slice
12 handler/user.go /user/me 只返回 username IM 需展示昵称头像,新增 GetUser 返回完整字段
13 群消息离线推送 依赖共享 status 字段导致漏推/重推 引入 per-user last_read_at,JOIN 查询精确过滤

这些 bug 里,#2(服务实例重复创建)#13(群消息离线推送) 是最有价值的——前者揭示了依赖注入不规范的后果,后者展示了多人场景下共享状态的经典问题。


十二、总结与后续规划

已实现

  • 完整的单聊/群聊 WebSocket 实时通信,含心跳保活、ACK、离线消息

  • per-user 群消息读取位点,断线不丢群消息

  • AI @指令(私聊/群聊)+ SSE 流式对话 + 上下文滚动摘要 + 持久化

  • AI Function Calling(多轮 Agentic 工具调用)+ 天气/新闻/导航等工具

  • 多 AI 提供商主备切换

  • JWT 鉴权 + 密码脱敏日志 + 令牌桶限流

  • gRPC 管理后台(复用 service 层),支持 etcd 服务发现

  • Docker 多阶段构建,docker-compose 一键部署

待完善

  • 消息可靠性:当前 ACK 没有客户端重发机制,极端网络下可能丢消息

  • 水平扩展:多实例部署时 Hub 是内存状态,需引入 Redis Pub/Sub 跨实例推送

  • 安全加固:config.yaml 中 DB 密码和 AI Key 尚为明文,生产需迁移到环境变量或 Secrets 管理


整个项目从架构设计到 bug 修复,覆盖了 Go 后端开发中最核心的几类问题:并发安全(Hub 的读写锁)、协议设计(WebSocket 消息类型)、AI 集成(上下文管理与工具调用)、安全实践(鉴权与脱敏)。希望这篇文章对正在学习 Go 的你有所帮助。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐