流式生成服务:单篇与系列博客的并发生成

本文是《从零构建 InkWords:AI 驱动的技术博客生成平台》系列的第 19 章。我们将深入解析后端服务层中负责内容生成的核心逻辑,包括单篇博客的流式生成、系列博客的并发生成调度、并发控制、自动续写机制等关键技术实现。

源码仓库:https://github.com/2692341798/InkWords

1. 引言:为什么需要流式生成?

想象一下,当你让 AI 生成一篇 5000 字的技术博客时,如果必须等待全部内容生成完毕才能看到结果,这个过程会非常煎熬。用户可能会担心:“AI 是不是卡住了?”、“生成失败了吗?”、“我需要等多久?”

流式生成就是为了解决这个问题而生。它就像"边做饭边端菜"——AI 每生成一小段内容,就立即推送给前端展示,让用户实时看到生成进度。这不仅提升了用户体验,还能在生成过程中发现并纠正问题。

在 InkWords 中,我们实现了两种生成模式:

  1. 单篇生成:针对单个文件或简短内容
  2. 系列生成:针对大型项目或复杂内容,自动拆分为多个章节并发生成

2. 单篇博客生成:GeneratorService 详解

2.1 服务结构

让我们先看看 GeneratorService 的基本结构:

// GeneratorService handles the blog generation process
type GeneratorService struct {
    llmClient *llm.DeepSeekClient
}

// NewGeneratorService creates a new generator service
func NewGeneratorService() *GeneratorService {
    apiKey := os.Getenv("DEEPSEEK_API_KEY")
    return &GeneratorService{
        llmClient: llm.NewDeepSeekClient(apiKey),
    }
}

代码解析

  • llmClient:封装了与 DeepSeek API 的通信逻辑
  • NewGeneratorService:从环境变量读取 API 密钥并初始化客户端
  • 使用依赖注入模式,便于测试和替换不同的 LLM 服务

2.2 流式生成核心流程

GenerateBlogStream 方法是单篇生成的核心,让我们逐段分析:

func (s *GeneratorService) GenerateBlogStream(ctx context.Context, userID uuid.UUID, 
    sourceContent string, sourceType string, 
    chunkChan chan<- string, errChan chan<- error) {
    
    // 1. 构建提示词
    prompt := fmt.Sprintf(`你是一个高级全栈架构师和技术博主。请根据以下提供的源内容,将其转化为一篇"小白友好、图文并茂、可独立复现"的高质量技术博客。
要求:
1. **字数充足,内容详实**:不要只写干瘪的总结。必须深入分析实现原理。
2. **代码级剖析**:对于每个技术点都添加更多的代码样例和图片来解释的更加详细。如果源内容包含代码,请引用核心代码并逐行解释其作用。
3. **可复现的步骤**:如果是实战或教程相关,请给出明确的执行步骤。
4. **小白友好**:在解释抽象的理论概念时,必须提供对应的代码示例或生活化比喻。
5. 所有生成的 Mermaid 图表代码块绝对禁止包含自定义样式关键字(如 style, classDef, linkStyle 等),必须使用基础语法。

源内容:
%s`, sourceContent)

    // 2. 构建消息列表
    messages := []llm.Message{
        {Role: "system", Content: "你是一个高级技术博客作者。"},
        {Role: "user", Content: prompt},
    }
    
    // 3. 配置模型类型
    modelType := "deepseek-chat"
    if envModel := os.Getenv("DEEPSEEK_MODEL"); envModel != "" {
        modelType = envModel
    }
    
    // 4. 创建内部通道用于拦截数据块
    internalChunkChan := make(chan string)
    internalErrChan := make(chan error)
    
    // 5. 创建可取消的上下文
    streamCtx, streamCancel := context.WithCancel(ctx)
    
    // ... 后续的接收器和生成器协程
}

关键点解析

  1. 提示词工程:精心设计的提示词是生成高质量博客的关键。我们明确要求 AI 扮演特定角色,并给出具体的写作要求。
  2. 模型配置:支持通过环境变量动态切换模型,便于测试不同模型的效果。
  3. 通道设计:使用 Go 的 Channel 实现协程间通信,这是 Go 并发编程的核心。
  4. 上下文控制:使用 context.WithCancel 实现超时控制和资源清理。

2.3 接收器协程:管理流式数据

// Receiver goroutine
go func() {
    defer streamCancel()
    defer close(chunkChan)
    defer close(errChan)

    var fullContent string
    idleTimeout := 60 * time.Second // 增加到 60 秒
    timer := time.NewTimer(idleTimeout)
    defer timer.Stop()

    for {
        select {
        case <-ctx.Done():
            errChan <- ctx.Err()
            return
        case <-timer.C:
            streamCancel()
            errChan <- fmt.Errorf("AI generation idle timeout (no data for %v)", idleTimeout)
            return
        case err, ok := <-internalErrChan:
            if ok && err != nil {
                errChan <- err
                return
            }
            if !ok {
                internalErrChan = nil
            }
        case chunk, ok := <-internalChunkChan:
            if !ok {
                // 流式生成成功完成,保存到数据库
                s.saveToDB(ctx, userID, sourceType, fullContent)
                return
            }
            if !timer.Stop() {
                select {
                case <-timer.C:
                default:
                }
            }
            timer.Reset(idleTimeout)

            fullContent += chunk
            chunkChan <- chunk
        }
    }
}()

设计亮点

  • 超时控制:60 秒空闲超时,防止 AI 响应过慢导致连接挂起
  • 优雅关闭:使用 defer 确保资源正确释放
  • 数据聚合:在接收过程中聚合完整内容,最后一次性保存到数据库
  • 错误传播:通过错误通道将异常传递给调用方

2.4 自动续写机制

当 AI 生成的内容过长时,可能会被模型截断。我们的系统实现了自动续写:

// Generator loop (handles auto-continuation)
go func() {
    defer close(internalChunkChan)
    defer close(internalErrChan)

    for {
        tempChunkChan := make(chan string)
        var assistantContent string
        var wg sync.WaitGroup
        wg.Add(1)

        go func() {
            defer wg.Done()
            for chunk := range tempChunkChan {
                assistantContent += chunk
                internalChunkChan <- chunk
            }
        }()

        finishReason, err := s.llmClient.GenerateStream(streamCtx, modelType, messages, tempChunkChan)
        wg.Wait() // 确保所有数据块都被收集

        if err != nil {
            internalErrChan <- err
            return
        }

        // 追加助手刚刚生成的内容
        messages = append(messages, llm.Message{
            Role:    "assistant",
            Content: assistantContent,
        })

        if finishReason != "length" {
            return
        }

        // 如果因为长度限制停止,自动续写
        continueMsg := llm.Message{
            Role:    "user",
            Content: "刚才你的回答被截断了,请严格从上文最后一个字符开始无缝续写。绝对不要输出"好的,我们继续"等任何过渡性废话,直接输出后续的Markdown或代码内容。",
        }
        messages = append(messages, continueMsg)
    }
}()

续写逻辑

  1. 检测截断:通过 finishReason == "length" 判断是否因长度限制被截断
  2. 无缝续写:添加续写提示,要求 AI 直接继续,不要添加过渡语
  3. 上下文维护:将已生成的内容追加到消息历史中,保持上下文连贯

3. 系列博客生成:DecompositionService 详解

3.1 服务架构

对于大型项目,我们需要先分析项目结构,然后拆分为多个章节并发生成:

// DecompositionService handles the logic to evaluate project text and generate an outline
type DecompositionService struct {
    llmClient  *llm.DeepSeekClient
    gitFetcher *parser.GitFetcher
}

// Chapter represents a single chapter in the generated outline
type Chapter struct {
    Title   string   `json:"title"`
    Summary string   `json:"summary"`
    Sort    int      `json:"sort"`
    Files   []string `json:"files"`
}

// OutlineResult represents the overall generated outline result
type OutlineResult struct {
    SeriesTitle string    `json:"series_title"`
    Chapters    []Chapter `json:"chapters"`
}

3.2 Map-Reduce 分析模式

对于大型项目,我们采用 Map-Reduce 模式进行分析:

小型项目

大型项目

开始分析

克隆 Git 仓库

项目大小判断

直接生成大纲

Map 阶段: 分块分析

Reduce 阶段: 汇总结果

生成系列大纲

结束

Map-Reduce 实现

func (s *DecompositionService) mapReduceAnalyze(ctx context.Context, chunks []parser.FileChunk, 
    sendProgress func(int, string, interface{})) []string {
    
    var summaries []string
    var mu sync.Mutex

    // 动态调整并发数,优化性能
    numCPU := runtime.NumCPU()
    maxWorkers := numCPU
    if maxWorkers < 3 {
        maxWorkers = 3
    }
    if maxWorkers > 8 {
        maxWorkers = 8
    }
    // 避免 Worker 数量大于任务数
    if len(chunks) < maxWorkers {
        maxWorkers = len(chunks)
    }

    sem := semaphore.NewWeighted(int64(maxWorkers))
    var wg sync.WaitGroup

    workerPool := make(chan int, maxWorkers)
    for i := 0; i < maxWorkers; i++ {
        workerPool <- i
    }

    for i, chunk := range chunks {
        wg.Add(1)
        go func(idx int, c parser.FileChunk) {
            defer wg.Done()
            if err := sem.Acquire(ctx, 1); err != nil {
                return
            }
            workerID := <-workerPool
            defer func() {
                workerPool <- workerID
                sem.Release(1)
            }()

            // 发送分析进度
            sendProgress(2, fmt.Sprintf("正在分析分块 %d/%d [%s]...", idx+1, len(chunks), c.Dir), 
                map[string]interface{}{
                    "status":    "chunk_analyzing",
                    "dir":       c.Dir,
                    "index":     idx + 1,
                    "total":     len(chunks),
                    "worker_id": workerID,
                })

            // 生成局部摘要(带重试机制)
            summary := s.generateLocalSummaryWithRetry(ctx, c, 3, sendProgress, idx+1, len(chunks), workerID)

            if summary != "" {
                mu.Lock()
                summaries = append(summaries, summary)
                mu.Unlock()
                sendProgress(2, fmt.Sprintf("分块 %d/%d 分析完成", idx+1, len(chunks)), 
                    map[string]interface{}{
                        "status":    "chunk_done",
                        "dir":       c.Dir,
                        "index":     idx + 1,
                        "worker_id": workerID,
                    })
            }
        }(i, chunk)
    }

    wg.Wait()
    return summaries
}

并发控制策略

  1. 动态 Worker 数量:根据 CPU 核心数和任务数动态调整
  2. 信号量控制:使用 semaphore 限制最大并发数
  3. Worker 池:复用 Worker ID,便于前端展示
  4. 互斥锁保护:使用 sync.Mutex 保护共享数据

3.3 并发生成系列博客

生成大纲后,我们并发生成各个章节:

func (s *DecompositionService) GenerateSeries(ctx context.Context, userID uuid.UUID, 
    parentID uuid.UUID, seriesTitle string, outline []Chapter, 
    sourceContent string, sourceType string, gitURL string, 
    progressChan chan<- string, errChan chan<- error) {
    
    defer close(progressChan)
    defer close(errChan)

    // 限制并发数为 3
    sem := semaphore.NewWeighted(3)
    var wg sync.WaitGroup

    for i, chapter := range outline {
        select {
        case <-ctx.Done():
            errChan <- ctx.Err()
            return
        default:
        }

        if err := sem.Acquire(ctx, 1); err != nil {
            errChan <- err
            return
        }

        wg.Add(1)
        go func(i int, chapter Chapter) {
            defer sem.Release(1)
            defer wg.Done()

            // 为每个章节构建特定的提示词
            extraRequirements := ""
            reqIndex := 7
            if gitURL != "" {
                extraRequirements += fmt.Sprintf("%d. **源码仓库引用**:请在文章开头或合适的位置,引用本项目的 Git 仓库地址:%s\n", 
                    reqIndex, gitURL)
                reqIndex++
            }
            if i+1 < len(outline) {
                extraRequirements += fmt.Sprintf("%d. **下期预告**:请在文章结尾处,明确预告下一篇文章的内容:"下期预告:%s"\n", 
                    reqIndex, outline[i+1].Title)
                reqIndex++
            }

            prompt := fmt.Sprintf(`你是一个高级全栈架构师和技术博主。请根据以下提供的源内容,以及本章节的大纲,将其转化为一篇"小白友好、图文并茂、可独立复现"的高质量技术博客章节。
要求:
1. **字数和篇幅适中**:为了保证生成完整性,单篇文章内容不要过于冗长(控制在 1000-1500 字左右)。不要一次性铺陈太多知识点,聚焦于本章节的核心目标即可。
2. **代码级剖析**:引用源内容中的核心代码,并逐行解释其作用。如果源内容因为截断而缺少具体代码,请基于目录结构和你的架构经验进行合理补充推演。
3. **可复现的步骤**:如果是实战或配置相关,请给出明确的执行步骤。
4. **小白友好**:在解释抽象的理论概念时,必须提供对应的代码示例或生活化比喻。
5. 所有生成的 Mermaid 图表代码块绝对禁止包含自定义样式关键字(如 style, classDef, linkStyle 等),必须使用基础语法。
6. **完整性约束**:请务必完整输出,不要遗漏关键知识点。如果内容较长,请合理分配篇幅,确保文章结构完整,包含结尾总结。
%s
源内容:
%s

本章节大纲:
- 标题: %s
- 摘要: %s
- 排序: %d
`, extraRequirements, chapterSourceContent, chapter.Title, chapter.Summary, chapter.Sort)

            // ... 生成和保存逻辑
        }(i, chapter)
    }

    wg.Wait()
}

章节生成特点

  1. 上下文感知:每个章节都知道自己的位置,可以添加"下期预告"
  2. 内容限制:限制每章 1000-1500 字,避免过长
  3. 源码引用:自动添加 Git 仓库引用
  4. 并发控制:限制同时生成 3 个章节,平衡速度和 API 限制

4. 数据持久化与状态管理

4.1 保存生成结果

func (s *GeneratorService) saveToDB(ctx context.Context, userID uuid.UUID, 
    sourceType string, content string) {
    
    title := "文件解析生成的博客"
    
    // 计算字数
    wordCount := len([]rune(content))
    
    // 使用 LLM 提取技术栈
    var techStacks datatypes.JSON
    extractPrompt := "请从以下文章内容中提取出涉及的核心技术栈名称(如 React, Go, Docker 等),以 JSON 数组格式返回,不要有任何其他多余字符:\n\n" + content
    messages := []llm.Message{
        {Role: "user", Content: extractPrompt},
    }
    
    extractedJSON, err := s.llmClient.Generate(ctx, "deepseek-chat", messages)
    if err == nil && len(extractedJSON) > 0 {
        var parsed []string
        if json.Unmarshal([]byte(extractedJSON), &parsed) == nil {
            techStacks = datatypes.JSON(extractedJSON)
        }
    }
    
    blog := &model.Blog{
        UserID:      userID,
        Title:       title,
        Content:     content,
        SourceType:  sourceType,
        Status:      1, // 已完成
        ChapterSort: 1,
        WordCount:   wordCount,
        TechStacks:  techStacks,
    }
    
    if err := db.DB.WithContext(ctx).Create(blog).Error; err != nil {
        fmt.Printf("Failed to save generated blog to DB: %v\n", err)
    } else {
        fmt.Printf("Saved generated blog to DB (ID: %s, Length: %d, TechStacks: %s)\n", 
            blog.ID, len(content), string(techStacks))
        
        // 更新用户使用的 Token 数(粗略估算)
        estimatedTokens := len([]rune(content)) * 2
        db.DB.Model(&model.User{}).Where("id = ?", userID).
            UpdateColumn("tokens_used", gorm.Expr("tokens_used + ?", estimatedTokens))
    }
}

数据持久化亮点

  1. 自动提取技术栈:使用 LLM 从生成内容中提取技术关键词
  2. 字数统计:使用 []rune 准确统计中文字数
  3. Token 估算:粗略估算 Token 使用量,用于配额管理
  4. 事务安全:使用 GORM 的 WithContext 确保数据库操作与上下文关联

4.2 系列博客的父子关系

系列博客需要维护章节间的层次关系:

// 保存父节点
var existingParent model.Blog
if err := db.DB.WithContext(ctx).First(&existingParent, "id = ?", parentID).Error; err != nil {
    parentBlog := &model.Blog{
        ID:         parentID,
        UserID:     userID,
        Title:      parentTitle,
        Content:    "该节点为系列文章的父节点,请点击展开查看具体的章节。",
        SourceType: sourceType,
        Status:     1, // 已完成
    }
    if err := db.DB.WithContext(ctx).Create(parentBlog).Error; err != nil {
        fmt.Printf("Failed to create parent blog: %v\n", err)
    }
}

// 保存子章节
blog := &model.Blog{
    UserID:      userID,
    ParentID:    &parentID,  // 指向父节点
    ChapterSort: chapter.Sort,
    Title:       chapter.Title,
    Content:     content,
    SourceType:  sourceType,
    Status:      1,
    WordCount:   wordCount,
    TechStacks:  techStacks,
}

数据库设计

  • ParentID 字段建立父子关系
  • ChapterSort 字段维护章节顺序
  • 父节点作为系列入口,子节点承载具体内容

5. 错误处理与重试机制

5.1 智能重试策略

func (s *DecompositionService) generateLocalSummaryWithRetry(ctx context.Context, 
    chunk parser.FileChunk, maxRetries int, 
    sendProgress func(int, string, interface{}), idx int, total int, workerID int) string {
    
    prompt := fmt.Sprintf(`你是一个高级全栈架构师。请分析以下代码块,提取其核心功能、主要接口和数据结构。
你的输出应该是一份精简的局部摘要,不需要过多的寒暄,直接列出关键信息。
目录位置:%s
代码内容:
%s`, chunk.Dir, chunk.Content)

    messages := []llm.Message{
        {Role: "system", Content: "你是一个高级架构师,专注于代码分析并输出精简摘要。"},
        {Role: "user", Content: prompt},
    }

    for attempt := 1; attempt <= maxRetries; attempt++ {
        select {
        case <-ctx.Done():
            return ""
        default:
        }

        // 设置超时上下文
        attemptCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
        summary, err := s.llmClient.Generate(attemptCtx, "deepseek-chat", messages)
        cancel()

        if err == nil {
            return fmt.Sprintf("【目录: %s】\n%s", chunk.Dir, summary)
        }

        // 发送重试进度
        sendProgress(2, fmt.Sprintf("分块 %d/%d 分析失败,正在重试 (%d/%d)...", idx, total, attempt, maxRetries), 
            map[string]interface{}{
                "status":    "chunk_failed",
                "dir":       chunk.Dir,
                "index":     idx,
                "attempt":   attempt,
                "worker_id": workerID,
            })

        // 指数退避
        time.Sleep(time.Second * time.Duration(attempt*2))
    }

    sendProgress(2, fmt.Sprintf("分块 %d/%d 分析最终失败,已跳过", idx, total), 
        map[string]interface{}{
            "status":    "chunk_failed_final",
            "dir":       chunk.Dir,
            "index":     idx,
            "worker_id": workerID,
        })
    return ""
}

重试策略特点

  1. 上下文感知:每次重试检查上下文是否已取消
  2. 超时控制:每次调用设置 3 分钟超时
  3. 指数退避:重试间隔逐渐增加(2秒、4秒、6秒…)
  4. 详细日志:记录每次重试的详细信息

5.2 章节生成的重试

var streamErr error
var content string
maxRetries := 3

for attempt := 1; attempt <= maxRetries; attempt++ {
    select {
    case <-ctx.Done():
        errChan <- ctx.Err()
        return
    default:
    }

    if attempt > 1 {
        // 发送重试通知
        retryMsg := map[string]interface{}{
            "status":       "retrying",
            "chapter_sort": chapter.Sort,
            "attempt":      attempt,
            "parent_id":    parentID.String(),
        }
        retryBytes, _ := json.Marshal(retryMsg)
        progressChan <- string(retryBytes)
    }
    
    // ... 生成逻辑
    
    if streamErr == nil {
        content = fullContentBuilder.String()
        break  // 成功则退出重试循环
    }
    
    time.Sleep(time.Second * time.Duration(attempt*2))
}

6. 性能优化与资源管理

6.1 内存优化:内容截断

// 限制章节内容长度,避免 Token 溢出
runes := []rune(chapterSourceContent)
if len(runes) > 100000 {
    chapterSourceContent = string(runes[:100000]) + "\n\n... [Content Truncated due to length limits] ..."
}

// 大纲生成时的内容截断
runes := []rune(sourceContent)
if len(runes) > 300000 {
    sourceContent = string(runes[:300000]) + "\n\n... [Content Truncated due to length limits] ..."
}

截断策略

  • 使用 []rune 准确处理中文字符
  • 保留前 N 个字符,添加截断提示
  • 避免超过 API 的 Token 限制

6.2 临时资源清理

// 克隆仓库到临时目录
var tempDir string
if sourceType == "git" && gitURL != "" {
    dir, err := os.MkdirTemp("", "inkwords-gen-*")
    if err == nil {
        tempDir = dir
        defer os.RemoveAll(tempDir)  // 确保清理
        cmd := exec.Command("git", "clone", "--depth", "1", gitURL, tempDir)
        cmd.Run() // 忽略错误,如果失败则回退到 sourceContent
    }
}

资源管理

  1. 临时目录:使用 os.MkdirTemp 创建唯一临时目录
  2. 自动清理defer os.RemoveAll 确保生成完成后清理
  3. 浅克隆--depth 1 只克隆最新提交,减少数据量
  4. 优雅降级:克隆失败时回退到已有内容

7. 实战示例:生成一篇 Go 并发教程

让我们通过一个具体示例,看看系统如何生成一篇关于 Go 并发的教程:

// 假设源内容是简单的并发示例代码
sourceCode := `
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("All workers completed")
}
`

// 调用生成服务
generator := NewGeneratorService()
chunkChan := make(chan string)
errChan := make(chan error)

go generator.GenerateBlogStream(context.Background(), userID, sourceCode, "code", chunkChan, errChan)

// 前端接收并显示
for chunk := range chunkChan {
    fmt.Print(chunk)  // 实时显示生成内容
}

生成结果可能包括

  1. 标题:Go 并发编程入门:sync.WaitGroup 详解
  2. 生活化比喻:把 WaitGroup 比作"小组长",负责统计和等待所有组员完成任务
  3. 代码逐行解析:解释 wg.Add(1)go worker()wg.Wait() 的作用
  4. 常见错误:指针传递错误、忘记调用 wg.Done()
  5. 扩展示例:使用 Channel 替代 WaitGroup 的实现
  6. 性能建议:何时使用 WaitGroup,何时使用其他同步原语

8. 总结

本章我们深入剖析了 InkWords 的流式生成服务,关键要点包括:

  1. 双模式生成:单篇流式生成 vs 系列并发生成,满足不同场景需求
  2. 智能并发控制:动态调整 Worker 数量,平衡性能与资源
  3. 自动续写机制:无缝处理长内容生成,提升用户体验
  4. 健壮的错误处理:智能重试、超时控制、优雅降级
  5. 资源高效管理:临时文件清理、内存优化、API 限制处理
  6. 数据持久化:自动提取技术栈、维护章节关系、Token 使用统计

这些设计确保了 InkWords 能够:

  • 高效处理从简单代码片段到大型项目的各种输入
  • 实时反馈生成进度,减少用户等待焦虑
  • 智能拆解复杂内容,生成结构清晰的系列教程
  • 在异常情况下保持系统稳定,提供有意义的错误信息

下期预告:Server-Sent Events (SSE) 接口实现

在下一章中,我们将深入探讨如何实现高效的 SSE 接口,包括连接管理、心跳机制、断线重连、多客户端支持等关键技术,让前端能够稳定可靠地接收实时生成内容。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐