用SSE服务器发送事件优化AI接口的打字机流式交互效果

信息图

一、打字机效应的用户体验价值

AI 接口的响应延迟是影响用户满意度的核心因素之一。研究表明,当用户等待超过2秒时,焦虑感开始显著增加。但即使总响应时间超过2秒,如果用户能看到 AI 逐字输出的过程,感知等待时间会缩短约40%。

这就是打字机效应(Typewriter Effect)的用户体验价值——将"等待"转化为"观看",将焦虑转化为期待。

二、SSE流式架构设计

用户输入 → 前端SSE客户端 → HTTP POST /api/chat/stream
                                  ↓
                            AI服务(OpenAI等)
                                  ↓
                          SSE流式响应 (text/event-stream)
                                  ↓
                        前端Buffer控制 → DOM逐字渲染
                                  ↓
                           光标闪烁效果

三、服务端SSE实现优化

3.1 流式代理转发

const express = require('express');
const { createParser } = require('eventsource-parser');

const app = express();

class SSEAIProxy {
  constructor(aiProvider) {
    this.aiProvider = aiProvider;
    this.activeStreams = new Map();
  }

  async createStream(req, res) {
    const { message, sessionId, model } = req.body;

    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      'X-Accel-Buffering': 'no',
      'Access-Control-Allow-Origin': '*'
    });

    const streamId = `${sessionId}-${Date.now()}`;
    const streamState = {
      tokens: 0,
      startTime: Date.now(),
      fullText: ''
    };

    this.activeStreams.set(streamId, streamState);

    req.on('close', () => {
      this.activeStreams.delete(streamId);
      console.log(`流 ${streamId} 被客户端中断`);
    });

    try {
      const aiStream = await this.aiProvider.createChatStream({
        model: model || 'gpt-3.5-turbo',
        messages: [
          { role: 'system', content: '你是一个有帮助的AI助手。请用简洁的语言回答。' },
          { role: 'user', content: message }
        ],
        stream: true,
        temperature: 0.7,
        max_tokens: 2048
      });

      for await (const chunk of aiStream) {
        if (this.activeStreams.has(streamId) === false) {
          break;
        }

        const content = chunk.choices?.[0]?.delta?.content || '';
        if (content) {
          streamState.tokens++;
          streamState.fullText += content;

          this.sendSSEEvent(res, 'token', {
            text: content,
            index: streamState.tokens,
            accumulated: streamState.fullText
          });
        }
      }

      if (this.activeStreams.has(streamId)) {
        const elapsed = Date.now() - streamState.startTime;

        this.sendSSEEvent(res, 'done', {
          fullText: streamState.fullText,
          tokens: streamState.tokens,
          duration: elapsed,
          tokensPerSecond: Math.round((streamState.tokens / elapsed) * 1000 * 10) / 10
        });

        this.activeStreams.delete(streamId);
      }
    } catch (error) {
      this.sendSSEEvent(res, 'error', {
        code: 'AI_STREAM_ERROR',
        message: 'AI服务响应异常',
        retryable: true
      });

      this.activeStreams.delete(streamId);
    }

    res.end();
  }

  sendSSEEvent(res, event, data) {
    res.write(`id: ${Date.now()}\n`);
    res.write(`event: ${event}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  }

  getActiveStreams() {
    return Array.from(this.activeStreams.entries()).map(([id, state]) => ({
      id,
      tokens: state.tokens,
      duration: Date.now() - state.startTime,
      preview: state.fullText.slice(0, 50)
    }));
  }
}

const aiProvider = {
  async createChatStream(params) {
    const { OpenAI } = require('openai');
    const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    return openai.chat.completions.create(params);
  }
};

const sseProxy = new SSEAIProxy(aiProvider);

app.post('/api/chat/stream', (req, res) => sseProxy.createStream(req, res));

3.2 多模型支持

class MultiModelAIProvider {
  constructor() {
    this.providers = {
      openai: this.createOpenAIProvider(),
      anthropic: this.createAnthropicProvider(),
      local: this.createLocalProvider()
    };
  }

  createOpenAIProvider() {
    const { OpenAI } = require('openai');
    const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

    return {
      createStream: (params) => client.chat.completions.create({
        ...params,
        model: params.model || 'gpt-3.5-turbo',
        stream: true
      })
    };
  }

  createAnthropicProvider() {
    return {
      createStream: async (params) => {
        const response = await fetch('https://api.anthropic.com/v1/messages', {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            'x-api-key': process.env.ANTHROPIC_API_KEY,
            'anthropic-version': '2023-06-01'
          },
          body: JSON.stringify({
            model: params.model || 'claude-3-haiku-20240307',
            messages: params.messages,
            max_tokens: params.max_tokens,
            stream: true
          })
        });

        return response.body;
      }
    };
  }

  createLocalProvider() {
    return {
      createStream: async (params) => {
        const response = await fetch('http://localhost:11434/api/chat', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({
            model: params.model || 'llama3',
            messages: params.messages,
            stream: true
          })
        });

        return response.body;
      }
    };
  }

  getProvider(name) {
    return this.providers[name] || this.providers.openai;
  }
}

四、前端流式渲染优化

4.1 React Hooks 实现

import { useState, useRef, useCallback, useEffect } from 'react';

function useSSEStream(url) {
  const [state, setState] = useState({
    isStreaming: false,
    currentText: '',
    fullText: '',
    error: null
  });

  const abortRef = useRef(null);
  const stateRef = useRef(state);
  stateRef.current = state;

  const processChunk = useCallback((text) => {
    setState(prev => ({
      ...prev,
      currentText: prev.currentText + text,
      fullText: prev.fullText + text
    }));
  }, []);

  const flushBuffer = useCallback(() => {
    setState(prev => ({
      ...prev,
      currentText: ''
    }));
  }, []);

  const startStream = useCallback(async (message, options = {}) => {
    const controller = new AbortController();
    abortRef.current = controller;

    setState({
      isStreaming: true,
      currentText: '',
      fullText: '',
      error: null
    });

    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          message,
          sessionId: options.sessionId,
          model: options.model
        }),
        signal: controller.signal
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const events = parseSSEBuffer(buffer);
        buffer = events.remainder;

        for (const event of events.parsed) {
          const data = JSON.parse(event.data);

          switch (event.event) {
            case 'token':
              processChunk(data.text);
              break;
            case 'done':
              setState(prev => ({
                ...prev,
                isStreaming: false,
                fullText: data.fullText
              }));
              break;
            case 'error':
              throw new Error(data.message);
          }
        }
      }
    } catch (error) {
      if (error.name !== 'AbortError') {
        setState(prev => ({
          ...prev,
          isStreaming: false,
          error: error.message
        }));
      }
    }
  }, [url, processChunk]);

  const abortStream = useCallback(() => {
    if (abortRef.current) {
      abortRef.current.abort();
      abortRef.current = null;
    }
    setState(prev => ({
      ...prev,
      isStreaming: false
    }));
  }, []);

  useEffect(() => {
    return () => {
      abortStream();
    };
  }, [abortStream]);

  return {
    ...state,
    startStream,
    abortStream,
    flushBuffer
  };
}

function parseSSEBuffer(buffer) {
  const parsed = [];
  let currentEvent = null;
  const lines = buffer.split('\n');
  let remainder = '';

  for (const line of lines) {
    if (line.startsWith('event: ')) {
      if (currentEvent) parsed.push(currentEvent);
      currentEvent = { event: line.slice(7).trim(), data: '' };
    } else if (line.startsWith('data: ')) {
      if (!currentEvent) currentEvent = { event: 'message', data: '' };
      currentEvent.data += line.slice(6);
    } else if (line === '' && currentEvent) {
      parsed.push(currentEvent);
      currentEvent = null;
    }
  }

  if (currentEvent) {
    remainder = `event: ${currentEvent.event}\ndata: ${currentEvent.data}\n`;
  }

  return { parsed, remainder };
}

4.2 Typewriter 渲染组件

function TypewriterText({ text, speed = 30, cursor = true }) {
  const [displayedText, setDisplayedText] = useState('');
  const [isComplete, setIsComplete] = useState(false);
  const indexRef = useRef(0);

  useEffect(() => {
    indexRef.current = 0;
    setDisplayedText('');
    setIsComplete(false);

    const interval = setInterval(() => {
      if (indexRef.current < text.length) {
        setDisplayedText(text.slice(0, indexRef.current + 1));
        indexRef.current++;
      } else {
        clearInterval(interval);
        setIsComplete(true);
      }
    }, speed);

    return () => clearInterval(interval);
  }, [text, speed]);

  return (
    <span className="typewriter-text">
      {displayedText}
      {!isComplete && cursor && (
        <span className="cursor">|</span>
      )}
    </span>
  );
}

function StreamingMessage({ role, content, isStreaming }) {
  const contentRef = useRef(null);

  useEffect(() => {
    if (contentRef.current) {
      contentRef.current.scrollIntoView({ behavior: 'smooth' });
    }
  }, [content]);

  return (
    <div className={`message ${role} ${isStreaming ? 'streaming' : ''}`}>
      <div className="avatar">
        {role === 'user' ? '👤' : '🤖'}
      </div>
      <div className="content">
        {isStreaming ? (
          <StreamingContent content={content} />
        ) : (
          <TypewriterText text={content} speed={15} cursor={false} />
        )}
      </div>
    </div>
  );
}

function StreamingContent({ content }) {
  const containerRef = useRef(null);

  useEffect(() => {
    if (containerRef.current) {
      containerRef.current.scrollTop = containerRef.current.scrollHeight;
    }
  }, [content]);

  return (
    <span className="streaming-content" ref={containerRef}>
      {content}
      <span className="cursor">|</span>
    </span>
  );
}

五、渲染性能优化

5.1 缓冲区控制

class RenderBuffer {
  constructor(options = {}) {
    this.options = {
      flushInterval: 16,
      maxBufferSize: 50,
      ...options
    };
    this.buffer = '';
    this.renderTimer = null;
    this.pendingRender = null;
  }

  append(text) {
    this.buffer += text;

    if (this.buffer.length >= this.options.maxBufferSize) {
      this.flush();
    }

    if (!this.renderTimer) {
      this.renderTimer = setTimeout(() => {
        this.flush();
      }, this.options.flushInterval);
    }
  }

  flush() {
    if (this.renderTimer) {
      clearTimeout(this.renderTimer);
      this.renderTimer = null;
    }

    if (this.buffer.length === 0) return;

    const text = this.buffer;
    this.buffer = '';

    if (this.pendingRender) {
      cancelAnimationFrame(this.pendingRender);
    }

    this.pendingRender = requestAnimationFrame(() => {
      this.options.onRender(text);
      this.pendingRender = null;
    });
  }

  clear() {
    if (this.renderTimer) {
      clearTimeout(this.renderTimer);
      this.renderTimer = null;
    }
    if (this.pendingRender) {
      cancelAnimationFrame(this.pendingRender);
      this.pendingRender = null;
    }
    this.buffer = '';
  }
}

5.2 完整聊天组件

function AIChat() {
  const { isStreaming, currentText, fullText, error, startStream, abortStream } = useSSEStream('/api/chat/stream');
  const [messages, setMessages] = useState([]);
  const [input, setInput] = useState('');
  const bufferRef = useRef(new RenderBuffer({
    onRender: () => {}
  }));

  const handleSend = async () => {
    if (!input.trim() || isStreaming) return;

    const userMessage = { role: 'user', content: input };
    setMessages(prev => [...prev, userMessage]);
    setInput('');

    setMessages(prev => [...prev, {
      role: 'assistant',
      content: '',
      isStreaming: true
    }]);

    await startStream(input);

    setMessages(prev => {
      const updated = [...prev];
      const lastMsg = updated[updated.length - 1];
      if (lastMsg.isStreaming) {
        lastMsg.content = fullText;
        lastMsg.isStreaming = false;
      }
      return updated;
    });
  };

  return (
    <div className="chat-container">
      <div className="chat-messages">
        {messages.map((msg, i) => (
          <StreamingMessage
            key={i}
            role={msg.role}
            content={msg.isStreaming ? currentText : msg.content}
            isStreaming={msg.isStreaming}
          />
        ))}
        {error && <div className="error">{error}</div>}
      </div>

      <div className="chat-input">
        <input
          value={input}
          onChange={e => setInput(e.target.value)}
          onKeyDown={e => e.key === 'Enter' && !e.shiftKey && handleSend()}
          disabled={isStreaming}
        />
        {isStreaming ? (
          <button className="stop" onClick={abortStream}>停止</button>
        ) : (
          <button onClick={handleSend}>发送</button>
        )}
      </div>
    </div>
  );
}

总结

优化策略 效果 实现要点
服务端流式代理 消除AI服务冷启动延迟 stream: true + for await
RenderBuffer 控制DOM更新频率 requestAnimationFrame + 缓冲区
事件ID 支持断线重连 SSE id字段
X-Accel-Buffering 禁止Nginx缓冲 响应头设置
光标动画 提升感知速度 CSS animation

SSE 流式交互是优化 AI 接口打字机效果的最佳技术方案。通过服务端流式传输+前端缓冲渲染的架构,可以在保证低延迟的同时实现平滑的逐字显示效果。关键在于:服务端要确保流式传输不被反向代理缓冲,前端要使用 requestAnimationFrame 控制渲染节奏,避免高频DOM操作导致的性能问题。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐