前言

如果你正在阅读这篇文章,很可能你已经听说过 OpenAI 的强大能力,但还不知道如何在自己的代码中调用它。或者你已经开始使用,但想要更系统地掌握最佳实践。

无论你处于哪个阶段,这篇文章都将带你从零开始,逐步掌握 OpenAI SDK 的核心用法、进阶技巧,以及国内兼容厂商的接入方法。最终,你将能够自信地将大模型能力集成到自己的应用中。


第一部分:基础认知

1.1 什么是 OpenAI SDK?

SDK(Software Development Kit,软件开发工具包)是 OpenAI 官方提供的 Python 库,让你能够用几行代码就调用 GPT-4、GPT-3.5 等模型。

SDK vs 直接 HTTP 调用

让我们对比一下,不使用 SDK 和使用 SDK 的区别:

不使用 SDK(原生 HTTP 请求):

import requests
import json

headers = {
    "Authorization": "Bearer sk-xxx",
    "Content-Type": "application/json"
}
data = {
    "model": "gpt-4",
    "messages": [{"role": "user", "content": "你好"}]
}
response = requests.post(
    "https://api.openai.com/v1/chat/completions",
    headers=headers,
    json=data
)
result = response.json()
answer = result['choices'][0]['message']['content']

使用 OpenAI SDK:

from openai import OpenAI

client = OpenAI(api_key="sk-xxx")
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "你好"}]
)
answer = response.choices[0].message.content

SDK 的优势:

  • ✅ 代码更简洁、可读性更强
  • ✅ 自动处理认证和重试
  • ✅ 完整的类型提示(IDE 智能补全)
  • ✅ 统一的错误处理机制

1.2 核心概念速览

在开始编码之前,需要理解几个核心概念:

概念 说明 示例
API Key 认证凭证,标识你的身份和计费账户 sk-proj-xxx...
Model 你使用的具体模型 gpt-4, gpt-3.5-turbo
Message 对话中的一条消息,包含角色和内容 {"role": "user", "content": "你好"}
Token 模型处理文本的最小单位,也是计费单位 “Hello” = 1 token, “你好” = 1 token
Temperature 控制输出随机性,0-1 之间 0 = 确定,1 = 多样

Message 的三种角色:

  • system:设定助手的行为模式(“你是一个送祝福大师”)
  • user:用户的问题或指令
  • assistant:模型的回复

第二部分:快速入门

2.1 环境搭建

安装 OpenAI SDK:

pip install openai

配置 API Key:

推荐使用环境变量(不要硬编码在代码中):

# Linux/Mac
export OPENAI_API_KEY="sk-xxx"

# Windows (CMD)
set OPENAI_API_KEY=sk-xxx

# Windows (PowerShell)
$env:OPENAI_API_KEY="sk-xxx"

或在代码中通过 .env 文件管理:

from dotenv import load_dotenv
import os

load_dotenv()  # 读取 .env 文件
api_key = os.getenv("OPENAI_API_KEY")

2.2 你的第一个 API 调用

from openai import OpenAI

# 初始化客户端
client = OpenAI()

# 调用模型
response = client.chat.completions.create(
    model="gpt-3.5-turbo",  # 或 "gpt-4"
    messages=[
        {"role": "system", "content": "你是一个有帮助的助手"},
        {"role": "user", "content": "解释一下什么是大语言模型"}
    ],
    temperature=0.7,
    max_tokens=500
)

# 获取回复
answer = response.choices[0].message.content
print(answer)

2.3 理解响应对象

print(f"回复内容: {response.choices[0].message.content}")
print(f"使用的模型: {response.model}")
print(f"消耗的 tokens: {response.usage.total_tokens}")
print(f"输入 tokens: {response.usage.prompt_tokens}")
print(f"输出 tokens: {response.usage.completion_tokens}")

输出示例:

回复内容: 大语言模型是一种基于深度学习的人工智能模型...
使用的模型: gpt-3.5-turbo-0125
消耗的 tokens: 85
输入 tokens: 25
输出 tokens: 60

第三部分:核心功能详解

3.1 对话管理(多轮对话)

OpenAI API 是无状态的,每次调用都是独立的。要实现多轮对话,需要手动维护对话历史。

# 初始化对话历史
conversation_history = [
    {"role": "system", "content": "你是一个送祝福大师"}
]

def chat_with_bot(user_input):
    # 添加用户消息
    conversation_history.append({"role": "user", "content": user_input})
    
    # 调用 API
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=conversation_history
    )
    
    # 获取助手回复
    assistant_msg = response.choices[0].message
    conversation_history.append(assistant_msg)
    
    return assistant_msg.content

# 连续对话
print(chat_with_bot("祝姐姐生日快乐"))
print(chat_with_bot("再换一个更有文采的版本"))

对话截断策略: 当对话历史超过模型的最大上下文长度时,需要截断:

def truncate_conversation(history, max_tokens=4000):
    """简单截断:保留最近的对话"""
    total_tokens = count_tokens(history)  # 需要实现 token 计数
    while total_tokens > max_tokens and len(history) > 1:
        # 移除最早的用户-助手对
        history.pop(1)  # 跳过 system prompt
        history.pop(1)
        total_tokens = count_tokens(history)
    return history

3.2 参数调优指南

参数 作用 推荐范围 场景示例
temperature 控制随机性。值越高,输出越多样 0-1 创意写作用 0.8,代码生成用 0.2
top_p 核采样,只从概率 top_p 的词汇中选 0.8-0.95 通常与 temperature 二选一
max_tokens 限制输出最大长度 按需设置 短回答 100,长文章 2000
presence_penalty 惩罚已出现的话题,鼓励新话题 -2.0~2.0 避免重复用 0.5
frequency_penalty 惩罚已出现的词汇,鼓励用词多样 -2.0~2.0 减少冗余用 0.5

参数组合建议:

# 创意任务(写诗、故事)
response = client.chat.completions.create(
    model="gpt-4",
    temperature=0.9,
    top_p=0.95,
    presence_penalty=0.6
)

# 精确任务(代码、翻译)
response = client.chat.completions.create(
    model="gpt-4",
    temperature=0.1,
    top_p=0.9
)

# 平衡模式(客服、对话)
response = client.chat.completions.create(
    model="gpt-3.5-turbo",
    temperature=0.7,
    presence_penalty=0.3
)

3.3 流式输出(Streaming)

流式输出让模型逐字返回结果,大幅提升用户体验。

# 流式调用
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "写一首关于春天的短诗"}],
    stream=True  # 开启流式
)

# 逐字输出
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

前端集成(SSE - Server-Sent Events):

from flask import Flask, Response, stream_with_context

app = Flask(__name__)

@app.route('/chat/stream')
def chat_stream():
    def generate():
        stream = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "你好"}],
            stream=True
        )
        for chunk in stream:
            if chunk.choices[0].delta.content:
                yield f"data: {chunk.choices[0].delta.content}\n\n"
        yield "data: [DONE]\n\n"
    
    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

3.4 异步调用(Async)

对于高并发场景,使用异步版本可以大幅提升性能。

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def async_chat(prompt):
    response = await async_client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# 并发处理多个请求
async def batch_process():
    prompts = [
        "解释什么是机器学习",
        "什么是深度学习",
        "什么是强化学习"
    ]
    tasks = [async_chat(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    return results

# 运行
results = asyncio.run(batch_process())
for i, result in enumerate(results):
    print(f"结果 {i+1}: {result[:100]}...")

3.5 函数调用(Function Calling)

函数调用让模型能够主动请求调用外部函数,这是构建 AI Agent 的核心能力。

工作原理:

  1. 用户提问,你提供可用函数定义
  2. 模型判断是否需要调用函数
  3. 如果需要,返回函数名和参数
  4. 你执行函数,将结果返回给模型
  5. 模型基于函数结果生成最终回答

完整示例:天气查询

# 1. 定义可用函数
def get_current_weather(location, unit="celsius"):
    """模拟天气查询函数"""
    # 实际应用中这里调用真实天气 API
    weather_data = {
        "北京": "晴天,25度",
        "上海": "多云,22度",
        "广州": "雨天,28度"
    }
    return weather_data.get(location, f"{location}:天气数据暂缺")

# 2. 定义工具(Tools)
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "获取指定城市的当前天气",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "城市名称,如:北京、上海"
                    },
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "温度单位"
                    }
                },
                "required": ["location"]
            }
        }
    }
]

# 3. 用户提问
messages = [
    {"role": "user", "content": "北京今天天气怎么样?"}
]

# 4. 第一次调用,模型返回需要调用函数
response = client.chat.completions.create(
    model="gpt-4",
    messages=messages,
    tools=tools,
    tool_choice="auto"  # 让模型自动决定是否调用
)

# 5. 检查是否需要调用函数
if response.choices[0].message.tool_calls:
    tool_call = response.choices[0].message.tool_calls[0]
    func_name = tool_call.function.name
    func_args = json.loads(tool_call.function.arguments)
    
    # 6. 执行函数
    if func_name == "get_current_weather":
        weather_result = get_current_weather(**func_args)
    
    # 7. 将函数结果返回给模型
    messages.append(response.choices[0].message)  # 添加模型的函数调用请求
    messages.append({
        "role": "tool",
        "tool_call_id": tool_call.id,
        "content": weather_result
    })
    
    # 8. 第二次调用,模型基于函数结果生成最终回答
    final_response = client.chat.completions.create(
        model="gpt-4",
        messages=messages
    )
    
    print(final_response.choices[0].message.content)

3.6 JSON 模式

强制模型输出合法的 JSON 格式,非常适合数据提取场景。

# 需要特定模型版本支持
response = client.chat.completions.create(
    model="gpt-4-1106-preview",  # 或 gpt-3.5-turbo-1106
    messages=[
        {"role": "system", "content": "你是一个信息提取助手,只输出 JSON 格式"},
        {"role": "user", "content": "提取:张三,25岁,软件工程师,北京"}
    ],
    response_format={"type": "json_object"}  # 强制 JSON 输出
)

# 输出示例:
# {"name": "张三", "age": 25, "job": "软件工程师", "city": "北京"}
result = json.loads(response.choices[0].message.content)
print(result["name"])  # 输出: 张三

3.7 视觉能力(GPT-4 Vision)

# 方式一:使用图片 URL
response = client.chat.completions.create(
    model="gpt-4-vision-preview",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "这张图片里有什么?"},
                {
                    "type": "image_url",
                    "image_url": {"url": "https://example.com/image.jpg"}
                }
            ]
        }
    ]
)

# 方式二:使用 Base64 编码的本地图片
import base64

with open("image.jpg", "rb") as f:
    base64_image = base64.b64encode(f.read()).decode('utf-8')

response = client.chat.completions.create(
    model="gpt-4-vision-preview",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "描述这张图片"},
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}
                }
            ]
        }
    ]
)

第四部分:进阶技巧

4.1 错误处理最佳实践

from openai import OpenAI, APIError, RateLimitError, APITimeoutError, APIConnectionError
import time

def robust_chat_call(messages, max_retries=5):
    """带重试机制的健壮调用"""
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=messages
            )
            return response.choices[0].message.content
        
        except RateLimitError as e:
            # 频率限制:指数退避
            wait_time = 2 ** attempt
            print(f"达到速率限制,{wait_time}秒后重试...")
            time.sleep(wait_time)
        
        except APITimeoutError as e:
            # 超时:快速重试
            print(f"请求超时,重试中...")
            time.sleep(1)
        
        except APIConnectionError as e:
            # 网络问题:稍长等待
            print(f"网络连接错误,3秒后重试...")
            time.sleep(3)
        
        except APIError as e:
            # 其他 API 错误
            print(f"API 错误: {e}")
            return None
    
    print("达到最大重试次数,请求失败")
    return None

4.2 Token 计数与成本控制

使用 tiktoken 库精确计算 token 数量:

import tiktoken

def count_tokens(text, model="gpt-3.5-turbo"):
    """计算文本的 token 数量"""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def estimate_cost(prompt, max_output_tokens=500, model="gpt-3.5-turbo"):
    """估算单次调用的成本"""
    # 定价(单位:美元/1K tokens)
    pricing = {
        "gpt-4": {"input": 0.03, "output": 0.06},
        "gpt-3.5-turbo": {"input": 0.001, "output": 0.002}
    }
    
    input_tokens = count_tokens(prompt, model)
    output_tokens = max_output_tokens
    
    cost = (input_tokens * pricing[model]["input"] + 
            output_tokens * pricing[model]["output"]) / 1000
    
    return {
        "input_tokens": input_tokens,
        "estimated_output_tokens": output_tokens,
        "estimated_cost_usd": round(cost, 5)
    }

# 使用示例
prompt = "请写一篇关于人工智能发展的文章"
cost_info = estimate_cost(prompt, max_output_tokens=1000)
print(f"预估成本: ${cost_info['estimated_cost_usd']}")

4.3 批量处理(Batch API)

对于非实时任务(如批量数据分析),Batch API 可以节省 50% 成本

# 1. 准备批量请求文件(JSONL 格式)
import json

requests = [
    {
        "custom_id": "request-1",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": "什么是机器学习?"}],
            "max_tokens": 100
        }
    },
    {
        "custom_id": "request-2",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": "什么是深度学习?"}],
            "max_tokens": 100
        }
    }
]

with open("batch_requests.jsonl", "w") as f:
    for req in requests:
        f.write(json.dumps(req) + "\n")

# 2. 上传文件
batch_input_file = client.files.create(
    file=open("batch_requests.jsonl", "rb"),
    purpose="batch"
)

# 3. 创建批量任务
batch_job = client.batches.create(
    input_file_id=batch_input_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h"  # 24小时内完成
)

# 4. 查询任务状态
batch_status = client.batches.retrieve(batch_job.id)
print(f"状态: {batch_status.status}")

# 5. 任务完成后获取结果
if batch_status.status == "completed":
    result_file = client.files.content(batch_status.output_file_id)
    # 解析结果
    for line in result_file.text.strip().split("\n"):
        result = json.loads(line)
        print(result)

4.4 可重复输出(Seed 参数)

使用相同的 seed 值和参数,可以获得一致的输出,非常适合测试和调试。

# 两次调用使用相同的 seed,输出一致
def reproducible_call(seed=42):
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "讲一个笑话"}],
        seed=seed,
        temperature=0.7
    )
    return response.choices[0].message.content

print(reproducible_call(42))  # 输出1
print(reproducible_call(42))  # 输出2(与输出1相同)
print(reproducible_call(43))  # 输出3(可能不同)

4.5 安全最佳实践

# 1. API Key 管理:使用环境变量,绝不硬编码
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.getenv("OPENAI_API_KEY"),
    organization=os.getenv("OPENAI_ORG_ID")  # 多组织管理
)

# 2. 内容审核(Moderation API)
def moderate_content(text):
    response = client.moderations.create(
        input=text
    )
    result = response.results[0]
    if result.flagged:
        print("内容违规!")
        for category, flagged in result.categories:
            if flagged:
                print(f"违规类别: {category}")
        return False
    return True

# 3. 输入输出过滤
user_input = get_user_input()
if not moderate_content(user_input):
    return "输入内容不符合规范"

第五部分:国内兼容厂商集成指南

OpenAI SDK 已成为事实标准,国内主流大模型厂商均兼容 OpenAI API 格式。你只需修改 base_urlapi_key,代码几乎无需改动。

5.1 智谱 AI(Zhipu AI / GLM-4)

特点:中文优化良好,提供免费额度

from openai import OpenAI

client = OpenAI(
    api_key="your-zhipu-api-key",
    base_url="https://open.bigmodel.cn/api/paas/v4/"
)

response = client.chat.completions.create(
    model="glm-4",  # 可选:glm-4-plus, glm-4-air, glm-4-flash
    messages=[
        {"role": "system", "content": "你是送祝福大师"},
        {"role": "user", "content": "祝姐姐生日快乐"}
    ]
)
print(response.choices[0].message.content)

5.2 DeepSeek(深度求索)

特点:极致性价比(约为 GPT-4 的 1/10),上下文 128K

from openai import OpenAI

client = OpenAI(
    api_key="your-deepseek-api-key",
    base_url="https://api.deepseek.com"
)

response = client.chat.completions.create(
    model="deepseek-chat",  # 或 deepseek-coder
    messages=[{"role": "user", "content": "写一首关于月亮的诗"}]
)

5.3 阿里云通义千问(Qwen)

from openai import OpenAI

client = OpenAI(
    api_key="your-dashscope-api-key",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

response = client.chat.completions.create(
    model="qwen-max",  # 可选:qwen-turbo, qwen-plus, qwen-max
    messages=[{"role": "user", "content": "什么是大语言模型"}]
)

5.4 百度文心一言(ERNIE)

from openai import OpenAI

client = OpenAI(
    api_key="your-api-key",
    base_url="https://qianfan.baidubce.com/v2"
)

response = client.chat.completions.create(
    model="ernie-4.0-8k",
    messages=[{"role": "user", "content": "介绍一下北京"}]
)

5.5 腾讯混元(Hunyuan)

from openai import OpenAI

client = OpenAI(
    api_key="your-secret-id:your-secret-key",
    base_url="https://api.hunyuan.cloud.tencent.com/v1"
)

response = client.chat.completions.create(
    model="hunyuan-pro",
    messages=[{"role": "user", "content": "讲个笑话"}]
)

5.6 月之暗面(Moonshot / Kimi)

特点:超长上下文(1M tokens),可处理整本小说

from openai import OpenAI

client = OpenAI(
    api_key="your-moonshot-api-key",
    base_url="https://api.moonshot.cn/v1"
)

response = client.chat.completions.create(
    model="moonshot-v1-128k",  # 可选:8k, 32k, 128k
    messages=[{"role": "user", "content": "总结这段长文本"}]
)

5.7 零一万物(Yi)

from openai import OpenAI

client = OpenAI(
    api_key="your-yi-api-key",
    base_url="https://api.lingyiwanwu.com/v1"
)

response = client.chat.completions.create(
    model="yi-34b-chat",
    messages=[{"role": "user", "content": "你好"}]
)

5.8 MiniMax(海螺AI)

from openai import OpenAI

client = OpenAI(
    api_key="your-minimax-api-key",
    base_url="https://api.minimax.chat/v1"
)

response = client.chat.completions.create(
    model="abab6.5-chat",
    messages=[{"role": "user", "content": "介绍自己"}]
)

5.9 国内厂商快速对比表

厂商 Base URL 推荐模型 特色 价格参考
智谱 AI open.bigmodel.cn/api/paas/v4/ glm-4 中文优化,免费额度 ¥0.1/1K
DeepSeek api.deepseek.com deepseek-chat 极致性价比,128K上下文 ¥0.001/1K
阿里通义 dashscope.aliyuncs.com/compatible-mode/v1 qwen-max 多尺寸选择 ¥0.02/1K
百度文心 qianfan.baidubce.com/v2 ernie-4.0 企业级 ¥0.12/1K
腾讯混元 api.hunyuan.cloud.tencent.com/v1 hunyuan-pro 腾讯生态 ¥0.1/1K
Kimi api.moonshot.cn/v1 moonshot-v1 超长上下文 ¥0.012/1K
零一万物 api.lingyiwanwu.com/v1 yi-34b-chat 开源生态 ¥0.01/1K

多厂商切换封装示例:

class MultiProviderClient:
    def __init__(self, provider="openai"):
        providers = {
            "openai": {"base_url": None, "model": "gpt-3.5-turbo"},
            "zhipu": {"base_url": "https://open.bigmodel.cn/api/paas/v4/", "model": "glm-4"},
            "deepseek": {"base_url": "https://api.deepseek.com", "model": "deepseek-chat"},
            "qwen": {"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", "model": "qwen-max"}
        }
        
        config = providers[provider]
        self.client = OpenAI(
            api_key=os.getenv(f"{provider.upper()}_API_KEY"),
            base_url=config["base_url"]
        )
        self.model = config["model"]
    
    def chat(self, messages):
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages
        )
        return response.choices[0].message.content

# 使用示例
client = MultiProviderClient("deepseek")
result = client.chat([{"role": "user", "content": "你好"}])

第六部分:实战项目

6.1 项目一:智能客服机器人

需求分析:

  • 支持多轮对话
  • 记住用户历史
  • 自动识别并处理常见问题

完整实现:

from openai import OpenAI
import json
from datetime import datetime

class CustomerServiceBot:
    def __init__(self, api_key=None):
        self.client = OpenAI(api_key=api_key)
        self.conversations = {}  # 存储各用户的对话历史
        self.system_prompt = """你是一个专业的客服助手,具有以下特点:
        1. 礼貌、耐心、专业
        2. 优先解决用户问题
        3. 如果问题超出范围,引导用户联系人工客服
        4. 记录关键信息以便后续跟进
        """
    
    def get_conversation(self, user_id):
        """获取或创建用户对话历史"""
        if user_id not in self.conversations:
            self.conversations[user_id] = [
                {"role": "system", "content": self.system_prompt}
            ]
        return self.conversations[user_id]
    
    def respond(self, user_id, message):
        """处理用户消息并返回回复"""
        conv = self.get_conversation(user_id)
        conv.append({"role": "user", "content": message})
        
        try:
            response = self.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=conv,
                temperature=0.7,
                max_tokens=500
            )
            
            reply = response.choices[0].message.content
            conv.append({"role": "assistant", "content": reply})
            
            # 控制历史长度(保留最近20条)
            if len(conv) > 22:  # 1条system + 20条对话 + 边界
                conv = [conv[0]] + conv[-20:]
                self.conversations[user_id] = conv
            
            return reply
        
        except Exception as e:
            return f"系统暂时无法响应,请稍后重试。错误:{str(e)}"
    
    def clear_history(self, user_id):
        """清除用户对话历史"""
        if user_id in self.conversations:
            self.conversations[user_id] = [
                {"role": "system", "content": self.system_prompt}
            ]

# 使用示例
bot = CustomerServiceBot()

# 模拟用户对话
user = "user_001"
print(bot.respond(user, "我的订单什么时候到?"))
print(bot.respond(user, "可以帮我查一下物流吗?"))
print(bot.respond(user, "谢谢,知道了"))

6.2 项目二:批量内容生成器

需求分析:

  • 批量生成 SEO 文章标题
  • 并发请求提升效率
  • 成本统计和结果导出

完整实现:

import asyncio
import json
import csv
from openai import AsyncOpenAI
from typing import List, Dict

class BatchContentGenerator:
    def __init__(self, api_key=None):
        self.client = AsyncOpenAI(api_key=api_key)
        self.results = []
    
    async def generate_single(self, topic: str, style: str) -> Dict:
        """生成单个内容"""
        prompt = f"请以{style}的风格,为以下主题生成一个吸引人的标题:{topic}"
        
        try:
            response = await self.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.8,
                max_tokens=100
            )
            return {
                "topic": topic,
                "style": style,
                "title": response.choices[0].message.content,
                "tokens": response.usage.total_tokens,
                "status": "success"
            }
        except Exception as e:
            return {
                "topic": topic,
                "style": style,
                "title": None,
                "error": str(e),
                "status": "failed"
            }
    
    async def batch_generate(self, tasks: List[Dict]) -> List[Dict]:
        """批量生成内容(并发)"""
        async_tasks = [
            self.generate_single(task["topic"], task["style"]) 
            for task in tasks
        ]
        self.results = await asyncio.gather(*async_tasks)
        return self.results
    
    def export_to_csv(self, filename="output.csv"):
        """导出结果到 CSV"""
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=["topic", "style", "title", "status"])
            writer.writeheader()
            for result in self.results:
                writer.writerow({
                    "topic": result["topic"],
                    "style": result["style"],
                    "title": result.get("title", ""),
                    "status": result["status"]
                })
        print(f"已导出到 {filename}")
    
    def print_stats(self):
        """打印统计信息"""
        total = len(self.results)
        success = sum(1 for r in self.results if r["status"] == "success")
        total_tokens = sum(r.get("tokens", 0) for r in self.results)
        
        print(f"总计: {total} 个任务")
        print(f"成功: {success} 个")
        print(f"失败: {total - success} 个")
        print(f"总 Token 消耗: {total_tokens}")

# 使用示例
async def main():
    generator = BatchContentGenerator()
    
    tasks = [
        {"topic": "人工智能发展", "style": "科技"},
        {"topic": "健康饮食", "style": "生活"},
        {"topic": "儿童教育", "style": "温馨"},
        {"topic": "环保出行", "style": "严肃"},
    ]
    
    results = await generator.batch_generate(tasks)
    generator.print_stats()
    generator.export_to_csv("titles.csv")

# 运行
asyncio.run(main())

第七部分:生产环境注意事项

7.1 性能优化

# 1. 连接池管理(使用 httpx)
import httpx
from openai import OpenAI

http_client = httpx.Client(
    timeout=30.0,
    limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
client = OpenAI(http_client=http_client)

# 2. 缓存策略(使用 Redis)
import redis
import hashlib
import json

cache = redis.Redis(host='localhost', port=6379, decode_responses=True)

def cached_chat(messages, ttl=3600):
    # 生成缓存 key
    cache_key = hashlib.md5(
        json.dumps(messages, sort_keys=True).encode()
    ).hexdigest()
    
    # 检查缓存
    cached = cache.get(cache_key)
    if cached:
        return cached
    
    # 调用 API
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages
    )
    result = response.choices[0].message.content
    
    # 存入缓存
    cache.setex(cache_key, ttl, result)
    return result

7.2 监控与可观测性

import logging
from datetime import datetime
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MonitoredClient:
    def __init__(self):
        self.client = OpenAI()
        self.metrics = {
            "total_requests": 0,
            "total_tokens": 0,
            "total_cost": 0.0,
            "errors": 0
        }
    
    def chat(self, messages, model="gpt-3.5-turbo"):
        start_time = datetime.now()
        self.metrics["total_requests"] += 1
        
        try:
            response = self.client.chat.completions.create(
                model=model,
                messages=messages
            )
            
            # 记录指标
            duration = (datetime.now() - start_time).total_seconds()
            tokens = response.usage.total_tokens
            cost = tokens * 0.002 / 1000  # gpt-3.5-turbo 定价
            
            self.metrics["total_tokens"] += tokens
            self.metrics["total_cost"] += cost
            
            # 记录日志
            logger.info(f"请求成功 | 耗时: {duration}s | tokens: {tokens} | 成本: ${cost:.5f}")
            
            return response.choices[0].message.content
        
        except Exception as e:
            self.metrics["errors"] += 1
            logger.error(f"请求失败: {str(e)}")
            raise
    
    def get_metrics(self):
        return self.metrics

7.3 降级与容灾

class ResilientClient:
    """支持多厂商降级的客户端"""
    
    def __init__(self):
        self.providers = [
            {"name": "openai", "client": OpenAI(), "model": "gpt-3.5-turbo"},
            {"name": "deepseek", "client": OpenAI(
                api_key=os.getenv("DEEPSEEK_API_KEY"),
                base_url="https://api.deepseek.com"
            ), "model": "deepseek-chat"},
            {"name": "zhipu", "client": OpenAI(
                api_key=os.getenv("ZHIPU_API_KEY"),
                base_url="https://open.bigmodel.cn/api/paas/v4/"
            ), "model": "glm-4-flash"}
        ]
    
    def chat_with_fallback(self, messages, fallback_to_local=True):
        """依次尝试各厂商,全部失败则使用本地模型"""
        
        for provider in self.providers:
            try:
                response = provider["client"].chat.completions.create(
                    model=provider["model"],
                    messages=messages,
                    timeout=10
                )
                print(f"使用 {provider['name']} 成功")
                return response.choices[0].message.content
            
            except Exception as e:
                print(f"{provider['name']} 失败: {e}")
                continue
        
        # 所有厂商失败,返回降级响应
        if fallback_to_local:
            return "系统繁忙,请稍后重试。"
        else:
            raise Exception("所有 API 提供商均不可用")

第八部分:常见问题与解决方案

问题 原因 解决方案
RateLimitError 请求频率超过限制 实现指数退避重试,控制并发数
上下文过长 对话历史超过模型限制 实现智能截断或摘要压缩
输出格式不对 模型自由发挥,未遵循格式要求 使用 JSON Mode 或 Few-shot 示例
中文效果差 模型对中文支持不佳 切换国产模型(智谱、通义等)
延迟过高 网络问题或模型太大 使用流式输出,切换轻量模型
成本超预期 未控制 token 消耗 设置 max_tokens,使用 Batch API
内容不安全 模型生成敏感内容 接入 Moderation API 过滤

附录

A. 完整代码示例汇总

所有示例代码已整理,可在此获取:[GitHub 仓库链接]

B. 推荐资源

C. 词汇表

术语 解释
Token 模型处理的最小文本单元,约等于 0.75 个英文单词或半个中文字
Embedding 将文本转换为向量,用于相似度计算
Fine-tuning 微调,用自定义数据训练模型
RAG 检索增强生成,结合外部知识库回答问题
Function Calling 函数调用,让模型能调用外部工具
System Prompt 系统提示,设定模型的行为模式

结语

恭喜你完成了这篇指南的学习!你现在已经掌握了:

  • ✅ OpenAI SDK 的基础和核心功能
  • ✅ 流式输出、异步调用、函数调用等高级特性
  • ✅ 错误处理、成本控制、性能优化等生产实践
  • ✅ 国内主流大模型厂商的接入方法
  • ✅ 两个完整的实战项目

大模型的能力正在飞速发展,SDK 也会不断更新。建议你时常查阅官方文档,保持学习的热情。

下一步建议:

  1. 用本文的模板快速搭建你的第一个应用
  2. 尝试接入不同的国内厂商,对比效果
  3. 学习 RAG(检索增强生成)和 Agent 开发

祝你开发愉快!如有问题,欢迎在评论区交流。


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐