流式生成服务:单篇与系列博客的并发生成
流式生成服务:单篇与系列博客的并发生成
本文是《从零构建 InkWords:AI 驱动的技术博客生成平台》系列的第 19 章。我们将深入解析后端服务层中负责内容生成的核心逻辑,包括单篇博客的流式生成、系列博客的并发生成调度、并发控制、自动续写机制等关键技术实现。
源码仓库:https://github.com/2692341798/InkWords
1. 引言:为什么需要流式生成?
想象一下,当你让 AI 生成一篇 5000 字的技术博客时,如果必须等待全部内容生成完毕才能看到结果,这个过程会非常煎熬。用户可能会担心:“AI 是不是卡住了?”、“生成失败了吗?”、“我需要等多久?”
流式生成就是为了解决这个问题而生。它就像"边做饭边端菜"——AI 每生成一小段内容,就立即推送给前端展示,让用户实时看到生成进度。这不仅提升了用户体验,还能在生成过程中发现并纠正问题。
在 InkWords 中,我们实现了两种生成模式:
- 单篇生成:针对单个文件或简短内容
- 系列生成:针对大型项目或复杂内容,自动拆分为多个章节并发生成
2. 单篇博客生成:GeneratorService 详解
2.1 服务结构
让我们先看看 GeneratorService 的基本结构:
// GeneratorService handles the blog generation process
type GeneratorService struct {
llmClient *llm.DeepSeekClient
}
// NewGeneratorService creates a new generator service
func NewGeneratorService() *GeneratorService {
apiKey := os.Getenv("DEEPSEEK_API_KEY")
return &GeneratorService{
llmClient: llm.NewDeepSeekClient(apiKey),
}
}
代码解析:
llmClient:封装了与 DeepSeek API 的通信逻辑NewGeneratorService:从环境变量读取 API 密钥并初始化客户端- 使用依赖注入模式,便于测试和替换不同的 LLM 服务
2.2 流式生成核心流程
GenerateBlogStream 方法是单篇生成的核心,让我们逐段分析:
func (s *GeneratorService) GenerateBlogStream(ctx context.Context, userID uuid.UUID,
sourceContent string, sourceType string,
chunkChan chan<- string, errChan chan<- error) {
// 1. 构建提示词
prompt := fmt.Sprintf(`你是一个高级全栈架构师和技术博主。请根据以下提供的源内容,将其转化为一篇"小白友好、图文并茂、可独立复现"的高质量技术博客。
要求:
1. **字数充足,内容详实**:不要只写干瘪的总结。必须深入分析实现原理。
2. **代码级剖析**:对于每个技术点都添加更多的代码样例和图片来解释的更加详细。如果源内容包含代码,请引用核心代码并逐行解释其作用。
3. **可复现的步骤**:如果是实战或教程相关,请给出明确的执行步骤。
4. **小白友好**:在解释抽象的理论概念时,必须提供对应的代码示例或生活化比喻。
5. 所有生成的 Mermaid 图表代码块绝对禁止包含自定义样式关键字(如 style, classDef, linkStyle 等),必须使用基础语法。
源内容:
%s`, sourceContent)
// 2. 构建消息列表
messages := []llm.Message{
{Role: "system", Content: "你是一个高级技术博客作者。"},
{Role: "user", Content: prompt},
}
// 3. 配置模型类型
modelType := "deepseek-chat"
if envModel := os.Getenv("DEEPSEEK_MODEL"); envModel != "" {
modelType = envModel
}
// 4. 创建内部通道用于拦截数据块
internalChunkChan := make(chan string)
internalErrChan := make(chan error)
// 5. 创建可取消的上下文
streamCtx, streamCancel := context.WithCancel(ctx)
// ... 后续的接收器和生成器协程
}
关键点解析:
- 提示词工程:精心设计的提示词是生成高质量博客的关键。我们明确要求 AI 扮演特定角色,并给出具体的写作要求。
- 模型配置:支持通过环境变量动态切换模型,便于测试不同模型的效果。
- 通道设计:使用 Go 的 Channel 实现协程间通信,这是 Go 并发编程的核心。
- 上下文控制:使用
context.WithCancel实现超时控制和资源清理。
2.3 接收器协程:管理流式数据
// Receiver goroutine
go func() {
defer streamCancel()
defer close(chunkChan)
defer close(errChan)
var fullContent string
idleTimeout := 60 * time.Second // 增加到 60 秒
timer := time.NewTimer(idleTimeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
errChan <- ctx.Err()
return
case <-timer.C:
streamCancel()
errChan <- fmt.Errorf("AI generation idle timeout (no data for %v)", idleTimeout)
return
case err, ok := <-internalErrChan:
if ok && err != nil {
errChan <- err
return
}
if !ok {
internalErrChan = nil
}
case chunk, ok := <-internalChunkChan:
if !ok {
// 流式生成成功完成,保存到数据库
s.saveToDB(ctx, userID, sourceType, fullContent)
return
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(idleTimeout)
fullContent += chunk
chunkChan <- chunk
}
}
}()
设计亮点:
- 超时控制:60 秒空闲超时,防止 AI 响应过慢导致连接挂起
- 优雅关闭:使用
defer确保资源正确释放 - 数据聚合:在接收过程中聚合完整内容,最后一次性保存到数据库
- 错误传播:通过错误通道将异常传递给调用方
2.4 自动续写机制
当 AI 生成的内容过长时,可能会被模型截断。我们的系统实现了自动续写:
// Generator loop (handles auto-continuation)
go func() {
defer close(internalChunkChan)
defer close(internalErrChan)
for {
tempChunkChan := make(chan string)
var assistantContent string
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for chunk := range tempChunkChan {
assistantContent += chunk
internalChunkChan <- chunk
}
}()
finishReason, err := s.llmClient.GenerateStream(streamCtx, modelType, messages, tempChunkChan)
wg.Wait() // 确保所有数据块都被收集
if err != nil {
internalErrChan <- err
return
}
// 追加助手刚刚生成的内容
messages = append(messages, llm.Message{
Role: "assistant",
Content: assistantContent,
})
if finishReason != "length" {
return
}
// 如果因为长度限制停止,自动续写
continueMsg := llm.Message{
Role: "user",
Content: "刚才你的回答被截断了,请严格从上文最后一个字符开始无缝续写。绝对不要输出"好的,我们继续"等任何过渡性废话,直接输出后续的Markdown或代码内容。",
}
messages = append(messages, continueMsg)
}
}()
续写逻辑:
- 检测截断:通过
finishReason == "length"判断是否因长度限制被截断 - 无缝续写:添加续写提示,要求 AI 直接继续,不要添加过渡语
- 上下文维护:将已生成的内容追加到消息历史中,保持上下文连贯
3. 系列博客生成:DecompositionService 详解
3.1 服务架构
对于大型项目,我们需要先分析项目结构,然后拆分为多个章节并发生成:
// DecompositionService handles the logic to evaluate project text and generate an outline
type DecompositionService struct {
llmClient *llm.DeepSeekClient
gitFetcher *parser.GitFetcher
}
// Chapter represents a single chapter in the generated outline
type Chapter struct {
Title string `json:"title"`
Summary string `json:"summary"`
Sort int `json:"sort"`
Files []string `json:"files"`
}
// OutlineResult represents the overall generated outline result
type OutlineResult struct {
SeriesTitle string `json:"series_title"`
Chapters []Chapter `json:"chapters"`
}
3.2 Map-Reduce 分析模式
对于大型项目,我们采用 Map-Reduce 模式进行分析:
Map-Reduce 实现:
func (s *DecompositionService) mapReduceAnalyze(ctx context.Context, chunks []parser.FileChunk,
sendProgress func(int, string, interface{})) []string {
var summaries []string
var mu sync.Mutex
// 动态调整并发数,优化性能
numCPU := runtime.NumCPU()
maxWorkers := numCPU
if maxWorkers < 3 {
maxWorkers = 3
}
if maxWorkers > 8 {
maxWorkers = 8
}
// 避免 Worker 数量大于任务数
if len(chunks) < maxWorkers {
maxWorkers = len(chunks)
}
sem := semaphore.NewWeighted(int64(maxWorkers))
var wg sync.WaitGroup
workerPool := make(chan int, maxWorkers)
for i := 0; i < maxWorkers; i++ {
workerPool <- i
}
for i, chunk := range chunks {
wg.Add(1)
go func(idx int, c parser.FileChunk) {
defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil {
return
}
workerID := <-workerPool
defer func() {
workerPool <- workerID
sem.Release(1)
}()
// 发送分析进度
sendProgress(2, fmt.Sprintf("正在分析分块 %d/%d [%s]...", idx+1, len(chunks), c.Dir),
map[string]interface{}{
"status": "chunk_analyzing",
"dir": c.Dir,
"index": idx + 1,
"total": len(chunks),
"worker_id": workerID,
})
// 生成局部摘要(带重试机制)
summary := s.generateLocalSummaryWithRetry(ctx, c, 3, sendProgress, idx+1, len(chunks), workerID)
if summary != "" {
mu.Lock()
summaries = append(summaries, summary)
mu.Unlock()
sendProgress(2, fmt.Sprintf("分块 %d/%d 分析完成", idx+1, len(chunks)),
map[string]interface{}{
"status": "chunk_done",
"dir": c.Dir,
"index": idx + 1,
"worker_id": workerID,
})
}
}(i, chunk)
}
wg.Wait()
return summaries
}
并发控制策略:
- 动态 Worker 数量:根据 CPU 核心数和任务数动态调整
- 信号量控制:使用
semaphore限制最大并发数 - Worker 池:复用 Worker ID,便于前端展示
- 互斥锁保护:使用
sync.Mutex保护共享数据
3.3 并发生成系列博客
生成大纲后,我们并发生成各个章节:
func (s *DecompositionService) GenerateSeries(ctx context.Context, userID uuid.UUID,
parentID uuid.UUID, seriesTitle string, outline []Chapter,
sourceContent string, sourceType string, gitURL string,
progressChan chan<- string, errChan chan<- error) {
defer close(progressChan)
defer close(errChan)
// 限制并发数为 3
sem := semaphore.NewWeighted(3)
var wg sync.WaitGroup
for i, chapter := range outline {
select {
case <-ctx.Done():
errChan <- ctx.Err()
return
default:
}
if err := sem.Acquire(ctx, 1); err != nil {
errChan <- err
return
}
wg.Add(1)
go func(i int, chapter Chapter) {
defer sem.Release(1)
defer wg.Done()
// 为每个章节构建特定的提示词
extraRequirements := ""
reqIndex := 7
if gitURL != "" {
extraRequirements += fmt.Sprintf("%d. **源码仓库引用**:请在文章开头或合适的位置,引用本项目的 Git 仓库地址:%s\n",
reqIndex, gitURL)
reqIndex++
}
if i+1 < len(outline) {
extraRequirements += fmt.Sprintf("%d. **下期预告**:请在文章结尾处,明确预告下一篇文章的内容:"下期预告:%s"\n",
reqIndex, outline[i+1].Title)
reqIndex++
}
prompt := fmt.Sprintf(`你是一个高级全栈架构师和技术博主。请根据以下提供的源内容,以及本章节的大纲,将其转化为一篇"小白友好、图文并茂、可独立复现"的高质量技术博客章节。
要求:
1. **字数和篇幅适中**:为了保证生成完整性,单篇文章内容不要过于冗长(控制在 1000-1500 字左右)。不要一次性铺陈太多知识点,聚焦于本章节的核心目标即可。
2. **代码级剖析**:引用源内容中的核心代码,并逐行解释其作用。如果源内容因为截断而缺少具体代码,请基于目录结构和你的架构经验进行合理补充推演。
3. **可复现的步骤**:如果是实战或配置相关,请给出明确的执行步骤。
4. **小白友好**:在解释抽象的理论概念时,必须提供对应的代码示例或生活化比喻。
5. 所有生成的 Mermaid 图表代码块绝对禁止包含自定义样式关键字(如 style, classDef, linkStyle 等),必须使用基础语法。
6. **完整性约束**:请务必完整输出,不要遗漏关键知识点。如果内容较长,请合理分配篇幅,确保文章结构完整,包含结尾总结。
%s
源内容:
%s
本章节大纲:
- 标题: %s
- 摘要: %s
- 排序: %d
`, extraRequirements, chapterSourceContent, chapter.Title, chapter.Summary, chapter.Sort)
// ... 生成和保存逻辑
}(i, chapter)
}
wg.Wait()
}
章节生成特点:
- 上下文感知:每个章节都知道自己的位置,可以添加"下期预告"
- 内容限制:限制每章 1000-1500 字,避免过长
- 源码引用:自动添加 Git 仓库引用
- 并发控制:限制同时生成 3 个章节,平衡速度和 API 限制
4. 数据持久化与状态管理
4.1 保存生成结果
func (s *GeneratorService) saveToDB(ctx context.Context, userID uuid.UUID,
sourceType string, content string) {
title := "文件解析生成的博客"
// 计算字数
wordCount := len([]rune(content))
// 使用 LLM 提取技术栈
var techStacks datatypes.JSON
extractPrompt := "请从以下文章内容中提取出涉及的核心技术栈名称(如 React, Go, Docker 等),以 JSON 数组格式返回,不要有任何其他多余字符:\n\n" + content
messages := []llm.Message{
{Role: "user", Content: extractPrompt},
}
extractedJSON, err := s.llmClient.Generate(ctx, "deepseek-chat", messages)
if err == nil && len(extractedJSON) > 0 {
var parsed []string
if json.Unmarshal([]byte(extractedJSON), &parsed) == nil {
techStacks = datatypes.JSON(extractedJSON)
}
}
blog := &model.Blog{
UserID: userID,
Title: title,
Content: content,
SourceType: sourceType,
Status: 1, // 已完成
ChapterSort: 1,
WordCount: wordCount,
TechStacks: techStacks,
}
if err := db.DB.WithContext(ctx).Create(blog).Error; err != nil {
fmt.Printf("Failed to save generated blog to DB: %v\n", err)
} else {
fmt.Printf("Saved generated blog to DB (ID: %s, Length: %d, TechStacks: %s)\n",
blog.ID, len(content), string(techStacks))
// 更新用户使用的 Token 数(粗略估算)
estimatedTokens := len([]rune(content)) * 2
db.DB.Model(&model.User{}).Where("id = ?", userID).
UpdateColumn("tokens_used", gorm.Expr("tokens_used + ?", estimatedTokens))
}
}
数据持久化亮点:
- 自动提取技术栈:使用 LLM 从生成内容中提取技术关键词
- 字数统计:使用
[]rune准确统计中文字数 - Token 估算:粗略估算 Token 使用量,用于配额管理
- 事务安全:使用 GORM 的
WithContext确保数据库操作与上下文关联
4.2 系列博客的父子关系
系列博客需要维护章节间的层次关系:
// 保存父节点
var existingParent model.Blog
if err := db.DB.WithContext(ctx).First(&existingParent, "id = ?", parentID).Error; err != nil {
parentBlog := &model.Blog{
ID: parentID,
UserID: userID,
Title: parentTitle,
Content: "该节点为系列文章的父节点,请点击展开查看具体的章节。",
SourceType: sourceType,
Status: 1, // 已完成
}
if err := db.DB.WithContext(ctx).Create(parentBlog).Error; err != nil {
fmt.Printf("Failed to create parent blog: %v\n", err)
}
}
// 保存子章节
blog := &model.Blog{
UserID: userID,
ParentID: &parentID, // 指向父节点
ChapterSort: chapter.Sort,
Title: chapter.Title,
Content: content,
SourceType: sourceType,
Status: 1,
WordCount: wordCount,
TechStacks: techStacks,
}
数据库设计:
ParentID字段建立父子关系ChapterSort字段维护章节顺序- 父节点作为系列入口,子节点承载具体内容
5. 错误处理与重试机制
5.1 智能重试策略
func (s *DecompositionService) generateLocalSummaryWithRetry(ctx context.Context,
chunk parser.FileChunk, maxRetries int,
sendProgress func(int, string, interface{}), idx int, total int, workerID int) string {
prompt := fmt.Sprintf(`你是一个高级全栈架构师。请分析以下代码块,提取其核心功能、主要接口和数据结构。
你的输出应该是一份精简的局部摘要,不需要过多的寒暄,直接列出关键信息。
目录位置:%s
代码内容:
%s`, chunk.Dir, chunk.Content)
messages := []llm.Message{
{Role: "system", Content: "你是一个高级架构师,专注于代码分析并输出精简摘要。"},
{Role: "user", Content: prompt},
}
for attempt := 1; attempt <= maxRetries; attempt++ {
select {
case <-ctx.Done():
return ""
default:
}
// 设置超时上下文
attemptCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
summary, err := s.llmClient.Generate(attemptCtx, "deepseek-chat", messages)
cancel()
if err == nil {
return fmt.Sprintf("【目录: %s】\n%s", chunk.Dir, summary)
}
// 发送重试进度
sendProgress(2, fmt.Sprintf("分块 %d/%d 分析失败,正在重试 (%d/%d)...", idx, total, attempt, maxRetries),
map[string]interface{}{
"status": "chunk_failed",
"dir": chunk.Dir,
"index": idx,
"attempt": attempt,
"worker_id": workerID,
})
// 指数退避
time.Sleep(time.Second * time.Duration(attempt*2))
}
sendProgress(2, fmt.Sprintf("分块 %d/%d 分析最终失败,已跳过", idx, total),
map[string]interface{}{
"status": "chunk_failed_final",
"dir": chunk.Dir,
"index": idx,
"worker_id": workerID,
})
return ""
}
重试策略特点:
- 上下文感知:每次重试检查上下文是否已取消
- 超时控制:每次调用设置 3 分钟超时
- 指数退避:重试间隔逐渐增加(2秒、4秒、6秒…)
- 详细日志:记录每次重试的详细信息
5.2 章节生成的重试
var streamErr error
var content string
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
select {
case <-ctx.Done():
errChan <- ctx.Err()
return
default:
}
if attempt > 1 {
// 发送重试通知
retryMsg := map[string]interface{}{
"status": "retrying",
"chapter_sort": chapter.Sort,
"attempt": attempt,
"parent_id": parentID.String(),
}
retryBytes, _ := json.Marshal(retryMsg)
progressChan <- string(retryBytes)
}
// ... 生成逻辑
if streamErr == nil {
content = fullContentBuilder.String()
break // 成功则退出重试循环
}
time.Sleep(time.Second * time.Duration(attempt*2))
}
6. 性能优化与资源管理
6.1 内存优化:内容截断
// 限制章节内容长度,避免 Token 溢出
runes := []rune(chapterSourceContent)
if len(runes) > 100000 {
chapterSourceContent = string(runes[:100000]) + "\n\n... [Content Truncated due to length limits] ..."
}
// 大纲生成时的内容截断
runes := []rune(sourceContent)
if len(runes) > 300000 {
sourceContent = string(runes[:300000]) + "\n\n... [Content Truncated due to length limits] ..."
}
截断策略:
- 使用
[]rune准确处理中文字符 - 保留前 N 个字符,添加截断提示
- 避免超过 API 的 Token 限制
6.2 临时资源清理
// 克隆仓库到临时目录
var tempDir string
if sourceType == "git" && gitURL != "" {
dir, err := os.MkdirTemp("", "inkwords-gen-*")
if err == nil {
tempDir = dir
defer os.RemoveAll(tempDir) // 确保清理
cmd := exec.Command("git", "clone", "--depth", "1", gitURL, tempDir)
cmd.Run() // 忽略错误,如果失败则回退到 sourceContent
}
}
资源管理:
- 临时目录:使用
os.MkdirTemp创建唯一临时目录 - 自动清理:
defer os.RemoveAll确保生成完成后清理 - 浅克隆:
--depth 1只克隆最新提交,减少数据量 - 优雅降级:克隆失败时回退到已有内容
7. 实战示例:生成一篇 Go 并发教程
让我们通过一个具体示例,看看系统如何生成一篇关于 Go 并发的教程:
// 假设源内容是简单的并发示例代码
sourceCode := `
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers completed")
}
`
// 调用生成服务
generator := NewGeneratorService()
chunkChan := make(chan string)
errChan := make(chan error)
go generator.GenerateBlogStream(context.Background(), userID, sourceCode, "code", chunkChan, errChan)
// 前端接收并显示
for chunk := range chunkChan {
fmt.Print(chunk) // 实时显示生成内容
}
生成结果可能包括:
- 标题:Go 并发编程入门:sync.WaitGroup 详解
- 生活化比喻:把 WaitGroup 比作"小组长",负责统计和等待所有组员完成任务
- 代码逐行解析:解释
wg.Add(1)、go worker()、wg.Wait()的作用 - 常见错误:指针传递错误、忘记调用
wg.Done() - 扩展示例:使用 Channel 替代 WaitGroup 的实现
- 性能建议:何时使用 WaitGroup,何时使用其他同步原语
8. 总结
本章我们深入剖析了 InkWords 的流式生成服务,关键要点包括:
- 双模式生成:单篇流式生成 vs 系列并发生成,满足不同场景需求
- 智能并发控制:动态调整 Worker 数量,平衡性能与资源
- 自动续写机制:无缝处理长内容生成,提升用户体验
- 健壮的错误处理:智能重试、超时控制、优雅降级
- 资源高效管理:临时文件清理、内存优化、API 限制处理
- 数据持久化:自动提取技术栈、维护章节关系、Token 使用统计
这些设计确保了 InkWords 能够:
- 高效处理从简单代码片段到大型项目的各种输入
- 实时反馈生成进度,减少用户等待焦虑
- 智能拆解复杂内容,生成结构清晰的系列教程
- 在异常情况下保持系统稳定,提供有意义的错误信息
下期预告:Server-Sent Events (SSE) 接口实现
在下一章中,我们将深入探讨如何实现高效的 SSE 接口,包括连接管理、心跳机制、断线重连、多客户端支持等关键技术,让前端能够稳定可靠地接收实时生成内容。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)