从零构建 Go IM 系统:WebSocket + AI Agent + gRPC 全栈实践
灵犀 IM - 从零构建 Go IM 系统:WebSocket + AI Agent + gRPC 全栈实践
关键词: Go / Gin / WebSocket / GORM / Redis / gRPC / go-zero / AI Function Calling / Docker
一、项目背景与整体思路
我决定做一个真实可用的 IM(即时通讯)系统,而不是仅仅 CRUD 教程。目标是:
-
覆盖 WebSocket 长连接、离线消息、群聊等 IM 核心场景
-
集成 AI 对话能力(@AI 指令 + Function Calling 工具调用)
-
管理后台通过 gRPC 与业务服务通信,支持 etcd 服务发现
-
前后端分离,Docker 一键部署
最终项目实现了以下完整功能链路:
Vue3 前端 ↕ HTTP REST (Gin) ← JWT 鉴权、限流、日志脱敏 ↕ WebSocket ← 实时消息、心跳、ACK、离线推送 ↕ SSE 流式 ← AI 对话逐 token 输出 ↕ gRPC (go-zero) ← 管理后台服务调用
数据层:MySQL(GORM) + Redis(缓存 / 会话)
二、技术栈总览
| 分层 | 技术选型 | 说明 |
|---|---|---|
| Web 框架 | Gin v1.12 | 路由、中间件、参数绑定 |
| 实时通信 | gorilla/websocket | 长连接管理 |
| ORM | GORM v1.31 + MySQL Driver | 数据持久化 |
| 缓存 | go-redis/v9 | 离线消息辅助、会话缓存 |
| 认证 | golang-jwt/jwt v5 | JWT 无状态鉴权 |
| 密码安全 | golang.org/x/crypto (bcrypt) | 密码哈希 |
| 微服务框架 | go-zero v1.10 | gRPC 服务注册/发现 (etcd) |
| 日志 | go.uber.org/zap | 结构化日志 |
| 限流 | golang.org/x/time/rate | 令牌桶 |
| AI 调用 | 自封装 HTTP 客户端 | 兼容 OpenAI 格式,支持主备切换 |
| 容器化 | Docker 多阶段构建 + Docker Compose | 镜像最小化 |
| 配置 | spf13/viper | yaml 配置热加载 |
三、项目架构:分层 + 依赖注入
3.1 目录结构
go-zero-im/ ├── cmd/server/main.go # 程序入口,统一创建服务实例 ├── config/ # 配置结构体 + viper 读取 ├── internal/ │ ├── handler/ # HTTP Handler(Gin 控制器) │ ├── service/ # 业务逻辑层(interface + 实现) │ ├── repository/ # 数据访问层(GORM 封装) │ ├── model/ # GORM 数据模型 │ ├── middleware/ # JWT、限流、日志、CORS、Recovery │ ├── router/ # 路由注册(用户侧 8080 + 管理侧 9090) │ └── ws/ # WebSocket(Hub + Client + Protocol) ├── rpc/ │ ├── proto/ # Protobuf 定义 │ ├── pb/ # 生成的 Go 代码 │ └── server/ # gRPC Server 实现(复用 service 层) └── pkg/ ├── ai/ # AI HTTP 客户端(非流式/流式/工具调用) ├── prompts/ # 系统提示词(从文件加载) ├── news/weather/route/ # AI 工具实现(获取新闻/天气/导航) └── utils/ # JWT 工具、响应封装
3.2 依赖注入:单点创建,多处共享
这里踩过一个经典坑:aiSvc 在 router.go 和 main.go 里各创建了一次,导致 WebSocket @AI 和 HTTP /ai/chat/stream 的上下文完全隔离——用 WS 聊的内容,HTTP 接口完全不知道。
解决方案是严格在 main.go 统一创建所有服务实例,router.Register 只接收已创建好的服务:
// main.go:所有 service 只创建一次 userSvc := service.NewUserService(userRepo) messageSvc := service.NewMessageService(msgRepo) groupSvc := service.NewGroupService(groupRepo) aiSvc := service.NewAIService(cfg.AI, aiContextRepo) adminSvc := service.NewAdminService(adminRepo, groupRepo, msgRepo) hub := ws.NewHub(messageSvc, groupSvc, aiSvc) go hub.Run() // 所有 handler 和 hub 共享同一套 service router.Register(r, log, cfg, hub, userSvc, messageSvc, groupSvc, aiSvc, adminSvc)
四、WebSocket 核心:Hub/Client 模型
4.1 Hub:在线连接注册表
Hub 是整个 WebSocket 层的核心,维护一张 username → *Client 的映射表,通过 channel 驱动事件循环,避免直接对 map 加锁。
type Hub struct {
mu sync.RWMutex // 保护 clients map
clients map[string]*Client // key = 用户名
register chan *Client
unregister chan *Client
messageSvc service.MessageService
groupSvc service.GroupService
aiSvc service.AIService
}
注册/注销操作全部通过 channel 发到事件循环里串行处理,查询(SendToUser)使用读锁,做到读写分离:
func (h *Hub) Run() {
for {
select {
case c := <-h.register:
h.mu.Lock()
// 同一用户二次登录时,关闭旧连接(踢下线)
if old, ok := h.clients[c.username]; ok {
close(old.send)
}
h.clients[c.username] = c
h.mu.Unlock()
case c := <-h.unregister:
h.mu.Lock()
// 防止新连接上线后旧连接的 unregister 误删新连接
if current, ok := h.clients[c.username]; ok && current == c {
delete(h.clients, c.username)
close(c.send)
}
h.mu.Unlock()
}
}
}
4.2 Client:读写分离的双协程模型
每个 WebSocket 连接对应一个 Client,启动两个 goroutine:
readPump goroutine ← 读客户端发来的消息,处理业务逻辑 writePump goroutine ← 消费 send channel,把消息写回客户端
心跳保活:writePump 定时发 Ping 帧(pingPeriod = pongWait * 9/10),readPump 的 PongHandler 收到后续期读超时,确保长时间无消息的连接不被服务端或中间代理强制断开。
ticker := time.NewTicker(pingPeriod) // 约 54 秒 // ... case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) c.conn.WriteMessage(websocket.PingMessage, nil)
ACK 机制:每条 TypeChat 消息落库后,服务端回 TypeAck,区分 delivered(对方在线已推)和 stored(对方离线已存库),前端可据此渲染消息状态。
4.3 消息协议设计
type Message struct {
Type int `json:"type"` // 1心跳 2单聊 3ACK 4群聊 5系统通知
MsgID string `json:"msg_id"` // 客户端生成的全局唯一ID(幂等去重)
From string `json:"from"` // 服务端强制写入,防前端伪造
To string `json:"to"` // 目标用户名(单聊)或群ID字符串(群聊)
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
Status string `json:"status,omitempty"` // ACK状态
ChatType string `json:"chat_type,omitempty"` // "private" / "group"
ConvPeer string `json:"conv_peer,omitempty"` // AI 回复时告知前端消息归属哪个会话
}
From 字段由服务端在 readPump 中强制覆盖为已认证的 c.username,客户端无法伪造发送方。
五、消息存储与离线推送
5.1 写入流程(先落库再投递)
客户端发消息
↓
readPump 接收 JSON
↓
messageSvc.SavePrivateMessage() // 先写 MySQL
↓
hub.SendToUser(to, rawMsg) // 尝试投递
├── 在线 → 推送到对方 send channel → MarkDelivered
└── 离线 → 消息已存库,status=stored → 等待下次上线拉取
↓
给发送方回 ACK(delivered / stored)
5.2 上线时拉取离线消息
连接建立(ws.Serve 函数)时,立即拉取两类离线消息:
单聊离线消息:查询 status = 'stored' 且 to = username 的消息,推送后批量标记为 delivered。
群聊离线消息:这里采用了 per-user 读取位点 方案,每个群成员的 group_members 表记录 last_read_at(上次断线时间点):
-- 查询某用户在所有群的离线消息
SELECT m.* FROM messages m
JOIN group_members gm ON m.`to` = gm.group_id
AND gm.username = ?
AND m.created_at > COALESCE(gm.last_read_at, gm.joined_at)
WHERE m.`to` IN (?) AND m.chat_type = 'group'
ORDER BY m.created_at ASC
断线时 readPump 的 defer 更新 last_read_at:
defer func() {
myGroups, err := c.hub.groupSvc.GetMyGroups(c.username)
if err == nil {
ids := // 提取 groupID 列表
_ = c.hub.groupSvc.UpdateLastReadAt(c.username, ids)
}
c.hub.unregister <- c
c.conn.Close()
}()
这套方案彻底解决了群消息重复推送问题(之前依赖共享 status 字段,多人群聊时会漏推或重复推)。
5.3 会话列表:统一排序
会话列表同时包含单聊和群聊,两种消息分别查询后 append 到同一切片,最后统一按 last_time 倒序:
// 修复前:单聊有序,但 append 群聊后打乱顺序
// 修复后:
sort.Slice(conversations, func(i, j int) bool {
return conversations[i].LastTime > conversations[j].LastTime
})
六、AI 集成:从简单对话到 Agentic 工具调用
这是整个项目里技术含量最高的部分,分三个层次递进。
6.1 @AI 指令:WebSocket 内嵌 AI 对话
在聊天窗口发送 @AI 你好 时,readPump 拦截处理:
if len(msg.Content) >= 3 && msg.Content[:3] == "@AI" {
question := strings.TrimPrefix(msg.Content[3:], " ")
if question == "" { continue } // 防空问题触发 AI
// ...
}
群聊 @AI 的完整流程(顺序很关键):
1. 先把用户的 @AI 消息落库(保证历史完整) 2. 提前查群成员列表(后面两次广播复用,减少 DB 查询) 3. 立即把用户消息广播给其他群成员(不等 AI 返回,体验更流畅) 4. 拉取近 30 条群聊记录,拼入 AI 提问上下文 5. 调用 AI(主备切换) 6. 把 AI 回复存库并广播给所有在线成员
AI 回复内容带上提问者信息,群历史里上下文清晰:
[jack 向AI提问] 什么是 goroutine? goroutine 是 Go 语言的轻量级线程...
6.2 上下文管理:滚动摘要压缩
每个用户独立维护对话上下文,超出阈值时异步压缩为摘要,保证历史不丢失的同时避免 token 无限增长:
type userContext struct {
summary string // 历史摘要(已压缩部分)
recent []ai.ChatMessage // 最近未压缩的对话
}
构建发送给 AI 的消息列表:
messages = append(messages, ai.ChatMessage{
Role: "system",
// 主人设 + 历史摘要合并为一条 system 消息
// 拆成两条 system 消息某些提供商会返回空回复
Content: prompts.WithChatSummary(prompts.ChatSystem, uc.summary),
})
messages = append(messages, uc.recent...) // 近期原始对话
异步压缩(不阻塞当前回复):
if needSummarize && reply != "" {
go func() {
newSummary, err := s.summarize(oldSummary, toSummarize)
// 只清除已被总结的消息,保留压缩期间并发进来的新消息
s.mu.Lock()
uc.summary = newSummary
uc.recent = uc.recent[removedCount:]
s.mu.Unlock()
s.persistContext(username) // 异步落库,进程重启后可恢复
}()
}
上下文还支持持久化到 MySQL(ai_context 表存 summary + recent_json),进程重启后首次访问时按需从 DB 恢复,无需重新建立对话。
6.3 主备 AI 切换
封装了一个简单但实用的主备切换:
func (s *aiService) callAI(messages []ai.ChatMessage) (string, error) {
reply, err := ai.Call(s.primaryCfg, messages, 60)
if err != nil {
log.Printf("primary ai failed: %v, switching to backup", err)
reply, err = ai.Call(s.backupCfg, messages, 60)
}
return reply, err
}
配置文件里分别配置主力(如 DeepSeek)和备用(如 Qwen)模型,无缝切换,上下文不丢失。
6.4 Function Calling:Agentic 工具调用
这是项目最有意思的部分。用户可以用自然语言操作 IM 功能:
"帮我建一个叫'Go学习小组'的群,把 alice、bob、carol 都加进去"
系统会自动执行多轮工具调用:
轮次1:AI 决策 → create_group("Go学习小组")
→ 执行,得到 group_id = 42
轮次2:AI → invite_members(group_id=42, ["alice","bob","carol"])
→ 执行邀请,结果追加到消息链
轮次3:AI 不再调用工具 → 退出循环
最终:流式生成自然语言回复,每个 chunk 通过 SSE 推给前端
工具执行器采用闭包注入当前用户名和业务服务:
func (h *AIHandler) buildUserToolExecutor(username string) func(name string, args map[string]interface{}) (string, error) {
return func(name string, args map[string]interface{}) (string, error) {
switch name {
case "create_group":
name, _ := args["name"].(string)
group, err := h.groupSvc.CreateGroup(name, username, 200)
// ...
case "invite_members":
// 解析 group_id,支持 group_name 模糊匹配兜底
groupID, err := resolveGroupID(args)
// ...
case "get_weather":
city, _ := args["city"].(string)
info, err := weather.FetchWeather(city)
return weather.Format(info), err
// ... 更多工具
}
}
}
SSE 事件流:前端能实时看到工具执行过程:
// AI 决定调工具
{"type":"tool_call","name":"create_group","args":"{\"name\":\"Go学习小组\"}"}
// 工具执行完成
{"type":"tool_result","name":"create_group","result":"群聊创建成功,ID=42","success":true}
// 最终回复的文字片段
{"type":"chunk","content":"好的,我已经帮你..."}
// 全部完成
{"type":"done","content":""}
工具对话历史同样支持滚动摘要压缩和持久化,type="tool" 的消息在摘要时被格式化为可读文本。
内置工具清单:
| 工具名 | 功能 |
|---|---|
search_user |
搜索用户(关键词) |
get_my_groups |
查询我加入的群 |
search_group |
按名称搜索群聊 |
create_group |
创建群聊 |
join_group |
加入群聊 |
leave_group |
退出群聊(支持 group_name 模糊匹配) |
invite_member/members |
邀请(批量)成员 |
kick_member/members |
踢(批量)成员 |
disband_group |
解散群聊 |
get_contacts |
获取联系人列表 |
add_contact |
添加联系人 |
delete_contact |
删除联系人 |
get_news |
获取最新新闻(分类) |
get_weather |
查询城市天气 |
get_route |
查询两地路线 |
七、中间件:安全与可观测性
7.1 JWT 鉴权
func AuthJWT(secret string) gin.HandlerFunc {
return func(c *gin.Context) {
auth := c.GetHeader("Authorization")
// Bearer <token>
claims, err := utils.ParseToken(secret, parts[1])
if err != nil {
response.Fail(c, 401, 401, "invalid token")
c.Abort()
return
}
c.Set("username", claims.Username)
c.Set("role", claims.Role) // 后续 handler 可做角色鉴权
c.Next()
}
}
WebSocket 升级时也校验 JWT,连接建立后绑定到 client.username,后续收到的消息 From 字段均由服务端填写,无法伪造。
7.2 结构化请求日志 + 敏感字段脱敏
请求进入时记录 method / path / body / IP,请求结束后记录状态码、耗时、响应大小。5xx 用 Error 级别,4xx 用 Warn,2xx/3xx 用 Info。
核心是密码脱敏——正则替换 JSON body 中的 password、secret、api_key、token 字段值:
var sensitivePatterns = []*regexp.Regexp{
regexp.MustCompile(`("password"\s*:\s*)"[^"]*"`),
regexp.MustCompile(`("secret"\s*:\s*)"[^"]*"`),
regexp.MustCompile(`("api_key"\s*:\s*)"[^"]*"`),
regexp.MustCompile(`("token"\s*:\s*)"[^"]*"`),
}
func maskSensitive(body string) string {
for _, p := range sensitivePatterns {
body = p.ReplaceAllString(body, `${1}"***"`)
}
return body
}
这个正则在最初被注释掉了,导致用户登录密码明文写入日志——这是真实修复过的安全 bug。
7.3 令牌桶限流
func RateLimit(qps int, burst int) gin.HandlerFunc {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return func(c *gin.Context) {
if !limiter.Allow() {
response.Fail(c, 429, 429, "too many requests")
c.Abort()
return
}
c.Next()
}
}
// 全局:200 QPS,允许 50 突发
r.Use(middleware.RateLimit(200, 50))
八、群聊的安全设计
群聊部分有几处容易被忽略的安全细节:
1. 发消息前校验群成员身份:readPump 处理 TypeGroupChat 时,查出群成员列表后校验 c.username 是否在其中,不在则返回 ACK failed 并拒绝广播。
2. 查看群历史需校验成员身份:HTTP 接口 GET /api/v1/message/history?group_id=xxx 需要先确认当前用户是该群成员,否则任意用户可查任意群历史。
3. 查看群成员需校验:GET /api/v1/group/members?group_id=xxx 同样验证当前用户是否在群里,防止非成员探测群成员列表。
九、gRPC + go-zero:管理后台服务
管理后台(端口 9090)使用 go-zero 框架,通过 gRPC 暴露管理接口(用户管理、群组管理、消息管理),关键是 复用了业务层的 service,而非重新连接 DB:
// GroupRpcServer 复用现有 AdminService,不新建 DB 连接
type GroupRpcServer struct {
pb.UnimplementedGroupRpcServer
adminSvc service.AdminService
}
func (s *GroupRpcServer) ListGroups(ctx context.Context, req *pb.ListGroupsReq) (*pb.ListGroupsResp, error) {
groups, total, err := s.adminSvc.ListGroups(int(req.Page), int(req.PageSize), req.Keyword)
// ...
}
Protobuf 统一定义通用响应:
message CommonResp {
int32 code = 1;
string msg = 2;
}
配置文件支持两种模式:
-
monolith:单体部署,gRPC Server 和 HTTP Server 同进程,直连调用 -
microservice:独立部署,通过 etcd 服务注册/发现
十、Docker 部署:多阶段构建最小化镜像
# Stage 1:编译(golang:1.26-alpine)
FROM golang:1.26-alpine AS builder
ENV GOPROXY=https://goproxy.cn,direct
# 先复制 go.mod/go.sum 利用构建缓存,依赖不变时不重新下载
COPY go.mod go.sum ./
RUN go mod download
COPY . .
# 关闭 CGO,静态链接,-ldflags="-s -w" 去掉调试信息,缩小体积
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
go build -ldflags="-s -w" -o server ./cmd/server/main.go
# Stage 2:最小运行镜像(alpine:3.20)
FROM alpine:3.20
RUN apk add --no-cache tzdata ca-certificates && \
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
COPY --from=builder /build/server ./server
COPY --from=builder /build/config ./config
COPY --from=builder /build/pkg/prompts/data ./pkg/prompts/data
EXPOSE 8080 9090
CMD ["./server", "-c", "config/config.yaml"]
docker-compose 结构:
services:
backend:
image: im_backend:latest
extra_hosts:
- "host.docker.internal:host-gateway" # Linux 访问宿主机 MySQL/Redis 必须
expose: ["8080", "9090"]
frontend:
image: im_frontend:latest # Nginx + Vue 静态文件
ports: ["3001:80"]
depends_on: [backend]
MySQL 和 Redis 以独立容器运行在宿主机,通过 host.docker.internal 访问,方便本地开发和生产复用同一份数据。
十一、踩坑记录:13 个真实修复的 Bug
这部分记录了项目开发中遇到的真实问题,每一个都代表一类典型错误:
| # | 文件 | 问题 | 根因/修复 |
|---|---|---|---|
| 1 | middleware/logger.go |
登录密码明文写入日志 | 脱敏正则被注释掉,已恢复 |
| 2 | router.go + main.go |
@AI 与 HTTP AI 对话上下文隔离 | aiSvc 重复创建,改为 main.go 统一创建 |
| 3 | handler/group.go |
"joinned" 拼写错误 |
改为 "joined" |
| 4 | handler/group.go |
ParseUint bit size 传了 60 |
改为 64 |
| 5 | ws/client.go |
群聊 @AI 回复不入库 | 单聊有 SaveMessage,群聊遗漏,已补 |
| 6 | ws/client.go |
@AI 发送空问题仍调 AI |
len > 3 不足够,加空内容守卫 |
| 7 | message_repo.go |
单聊会话列表乱序 | 缺 ORDER BY last_time DESC,已补 |
| 8 | handler/message.go |
群聊历史无成员校验 | 任意用户可查任意群,已加成员身份验证 |
| 9 | handler/group.go |
查群成员无鉴权 | 任意用户可探测任意群成员,已修复 |
| 10 | ws/client.go |
群聊 @AI 回复缺少提问者上下文 | 补齐 [user 向AI提问] 内容 格式 |
| 11 | message_repo.go |
会话列表混合单群聊后乱序 | append 后未统一排序,加 sort.Slice |
| 12 | handler/user.go |
/user/me 只返回 username |
IM 需展示昵称头像,新增 GetUser 返回完整字段 |
| 13 | 群消息离线推送 | 依赖共享 status 字段导致漏推/重推 | 引入 per-user last_read_at,JOIN 查询精确过滤 |
这些 bug 里,#2(服务实例重复创建) 和 #13(群消息离线推送) 是最有价值的——前者揭示了依赖注入不规范的后果,后者展示了多人场景下共享状态的经典问题。
十二、总结与后续规划
已实现
-
完整的单聊/群聊 WebSocket 实时通信,含心跳保活、ACK、离线消息
-
per-user 群消息读取位点,断线不丢群消息
-
AI @指令(私聊/群聊)+ SSE 流式对话 + 上下文滚动摘要 + 持久化
-
AI Function Calling(多轮 Agentic 工具调用)+ 天气/新闻/导航等工具
-
多 AI 提供商主备切换
-
JWT 鉴权 + 密码脱敏日志 + 令牌桶限流
-
gRPC 管理后台(复用 service 层),支持 etcd 服务发现
-
Docker 多阶段构建,docker-compose 一键部署
待完善
-
消息可靠性:当前 ACK 没有客户端重发机制,极端网络下可能丢消息
-
水平扩展:多实例部署时 Hub 是内存状态,需引入 Redis Pub/Sub 跨实例推送
-
安全加固:config.yaml 中 DB 密码和 AI Key 尚为明文,生产需迁移到环境变量或 Secrets 管理
整个项目从架构设计到 bug 修复,覆盖了 Go 后端开发中最核心的几类问题:并发安全(Hub 的读写锁)、协议设计(WebSocket 消息类型)、AI 集成(上下文管理与工具调用)、安全实践(鉴权与脱敏)。希望这篇文章对正在学习 Go 的你有所帮助。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)