ComfyUI全面掌握-知识点详解——本地 API 与 Cloud API 集成(对接+实操+排错)
本文为系列第 25 篇,属于"高阶进阶章节"的第三个知识点详解。我们将深入讲解 ComfyUI 本地 API 与 Cloud API 的完整集成方案,包含端点说明、代码实操、异常处理与生产级最佳实践。
知识点详解——本地 API 与 Cloud API 集成(对接+实操+排错)
目录
- 一、引言:为什么需要 API 集成?
- 二、本地 API 详解
- 三、Cloud API 详解
- 四、实操案例全解析
- 五、API 调用排错与优化
- 六、本地 API vs Cloud API 全维对比
- 七、总结与下一篇预告
- 官方参考链接
一、引言:为什么需要 API 集成?
1.1 API 集成的核心价值
在 ComfyUI 的实际生产应用中,手动拖拽节点 → 点击运行 → 等待结果 的交互模式远远不够。当我们需要:
- 将 AI 绘图能力集成到自己的 Web 应用或移动 App 中
- 实现定时、自动化的批量图像生成
- 对接 CI/CD 流水线,自动生成封面、海报
- 构建 SaaS 平台,为用户提供 AI 绘图服务
这时候就需要 API(应用程序编程接口) 来打通 ComfyUI 与外部系统之间的桥梁。
ComfyUI 提供了两套 API 方案:
| API 方案 | 定位 | 适用场景 |
|---|---|---|
| 本地 API | ComfyUI 自带的 REST API,本地启动后自动可用 | 内部系统集成、本地自动化、开发测试 |
| Cloud API | Comfy Cloud 平台提供的托管 API | 生产环境、对外服务、高可用需求 |
两套 API 的核心能力一致——提交工作流、查询状态、获取结果——但在部署方式、鉴权机制、性能限制等方面存在显著差异。
1.2 本地 API vs Cloud API:选型总览
| 对比维度 | 本地 API | Cloud API |
|---|---|---|
| 部署位置 | 本地服务器(127.0.0.1:8188) |
Comfy Cloud 云端 |
| 启动方式 | 启动 ComfyUI 时默认开启 | 需在 Cloud 后台创建 API Key |
| 鉴权方式 | 无(默认)/ --enable-auth |
API Key(Bearer Token) |
| 调用限制 | 无硬限制(受本地硬件制约) | 有速率限制和并发限制 |
| 可用性 | 依赖本地服务器运行 | 99.9% 托管可用性 |
| 数据隐私 | 数据完全本地 | 数据经云端处理 |
| 成本 | 免费(仅电费) | 按积分消耗 + 订阅费 |
| 适用场景 | 开发调试、内部自动化 | 生产部署、对外服务 |
选择建议:如果你的系统运行在同一个内网中,本地 API 是最简单、免费的选择;如果需要对外提供服务、需要高可用性保证,或想利用云端 GPU 算力,Cloud API 是更好的选择。
二、本地 API 详解
2.1 启动本地 API 服务
ComfyUI 默认启动时已包含 HTTP API 服务,访问 http://127.0.0.1:8188 即可使用。但要从外部程序调用 API,需要以下配置:
基础启动(本机访问):
python main.py
开启远程访问(局域网/公网):
python main.py --listen 0.0.0.0
--listen 0.0.0.0 监听所有网络接口,允许局域网内其他设备访问。默认端口为 8188,可通过 --port 参数修改:
python main.py --listen 0.0.0.0 --port 8288
指定 GPU 设备(多卡场景):
python main.py --gpu-only --gpu-device 0
全部常用启动参数速查:
| 参数 | 作用 | 示例 |
|---|---|---|
--listen [IP] |
开启远程监听 | --listen 0.0.0.0 |
--port N |
指定端口 | --port 8288 |
--gpu-only |
仅使用 GPU | 无参数 |
--cpu |
仅使用 CPU(极慢) | 无参数 |
--lowvram |
低显存模式 | 无参数 |
--novram |
不将模型加载到显存 | 无参数 |
--enable-auth |
开启 API 鉴权 | 无参数 |
--output-directory DIR |
指定输出目录 | --output /data/outputs |
--auto-launch |
启动后自动打开浏览器 | 无参数 |
2.2 核心端点说明
ComfyUI 本地 API 基于 WebSocket + HTTP 双通道设计。HTTP 负责提交任务和查询,WebSocket 负责实时推送状态更新。
HTTP 端点(核心):
| 方法 | 端点 | 功能 | 请求体 / 参数 |
|---|---|---|---|
POST |
/prompt |
提交工作流任务 | JSON:{ "prompt": {...}, "client_id": "..." } |
GET |
/history/{prompt_id} |
查询单个任务结果 | prompt_id 路径参数 |
GET |
/history |
查询全部任务历史 | 无 |
GET |
/queue |
查看当前队列状态 | 无 |
POST |
/queue |
取消队列中的任务 | JSON:{ "prompt_id": "..." , "delete": true } |
GET |
/object_info/{node_class} |
查询节点信息 | node_class 可选 |
GET |
/embeddings |
获取可用 embeddings 列表 | 无 |
GET |
/extensions |
获取已安装扩展列表 | 无 |
GET |
/providers |
查看可用的 providers | 无 |
GET |
/system_stats |
系统状态统计 | 无 |
WebSocket 端点:
| 端点 | 功能 |
|---|---|
ws://127.0.0.1:8188/ws?client_id={client_id} |
实时接收任务状态推送 |
工作流提交格式(POST /prompt 的请求体结构):
{
"prompt": {
"3": {
"class_type": "KSampler",
"inputs": {
"seed": 156680208700286,
"steps": 25,
"cfg": 7,
"sampler_name": "dpmpp_2m",
"scheduler": "karras",
"denoise": 1,
"model": ["4", 0],
"positive": ["6", 0],
"negative": ["7", 0],
"latent_image": ["5", 0]
}
},
"4": {
"class_type": "CheckpointLoaderSimple",
"inputs": {
"ckpt_name": "dreamshaper_8.safetensors"
}
},
"5": {
"class_type": "EmptyLatentImage",
"inputs": {
"width": 512,
"height": 512,
"batch_size": 1
}
},
"6": {
"class_type": "CLIPTextEncode",
"inputs": {
"text": "beautiful landscape, sunshine, mountains",
"clip": ["4", 1]
}
},
"7": {
"class_type": "CLIPTextEncode",
"inputs": {
"text": "blurry, low quality",
"clip": ["4", 1]
}
},
"8": {
"class_type": "VAEDecode",
"inputs": {
"samples": ["3", 0],
"vae": ["4", 2]
}
},
"9": {
"class_type": "SaveImage",
"inputs": {
"filename_prefix": "ComfyUI",
"images": ["8", 0]
}
}
},
"client_id": "my-app-client-001"
}
关键理解:
prompt字段内的 JSON 结构就是 ComfyUI 工作流的序列化表示。每个节点以数字 ID 为键,包含class_type(节点类型)和inputs(输入参数,其中的["node_id", output_index]格式表示连接关系)。这实际上就是 ComfyUI 工作流 JSON 文件(workflow_api格式)的完整结构。
如何获取工作流的 API JSON 格式?
在 ComfyUI 界面中:
- 完成工作流搭建后,点击右上角 Workflows(工作流)→ Save(保存)(API 格式)
- 或者点击 Save (API Format) 按钮直接保存为 API 调用可用的 JSON 文件
- 也可以用 "Save Image" 节点生成的图片(包含全量 metadata),通过脚本提取
推荐做法:先在 ComfyUI 界面中搭建并调试好工作流,确认能正确生成后,再导出为 API 格式用于代码调用。永远不要在 API 调用中第一次运行一个未经测试的工作流。
2.3 API 鉴权与安全配置
默认情况下,本地 API 没有任何鉴权——只要网络可达,任何人都可以提交任务。在生产环境中这显然不安全。
开启基本鉴权:
python main.py --listen 0.0.0.0 --enable-auth
启动后,需要在浏览器首次访问时设置用户名和密码。之后每次 API 调用都需要携带 Authorization 头。
使用 Nginx 反向代理 + HTTPS(推荐的生产方案):
server {
listen 443 ssl;
server_name comfyui.example.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
# 基本认证
auth_basic "ComfyUI API";
auth_basic_user_file /etc/nginx/.htpasswd;
location / {
proxy_pass http://127.0.0.1:8188;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 300s;
}
}
安全建议总结:
| 场景 | 推荐安全措施 |
|---|---|
| 仅本机使用 | 默认无鉴权即可(127.0.0.1 限制) |
| 局域网使用(可信环境) | 防火墙限制端口 + --listen 指定内网 IP |
| 公网访问 | 必须 HTTPS + 鉴权(推荐 Nginx 反向代理) |
| 生产环境对外服务 | 使用 Cloud API(托管安全) |
三、Cloud API 详解
3.1 Cloud API 的差异化能力
Cloud API 与本地 API 相比,除了部署位置不同,还有以下差异化能力:
| 能力 | 说明 |
|---|---|
| 托管 GPU 算力 | 无需自备 GPU,使用 RTX 6000 Pro(96GB 显存) |
| 高可用性 | 99.9% 服务可用性,自动故障恢复 |
| 并发执行 | 支持最多 5 个任务同时运行(视订阅计划) |
| 全球 CDN | 生成结果通过 CDN 加速分发 |
| 自动缩放 | 队列积压时自动增加计算资源 |
| 托管存储 | 工作流、模型、结果均存储在云端 |
订阅计划与 API 配额:
| 计划 | 月费 | API 并发上限 | 积分/月 | 单次最长运行 |
|---|---|---|---|---|
| 免费版 | $0 | ❌ 无 API 访问 | 400 | 10 分钟 |
| 标准版 | $20 | ❌ 无 API 访问 | 4,200 | 30 分钟 |
| 创作者版 | $35 | ✅ 3 个并发 | 7,400 | 30 分钟 |
| 专业版 | $100 | ✅ 5 个并发 | 21,100 | 60 分钟 |
注意:免费版和标准版不提供 API 访问。如果需要 API 集成,至少需要创作者版($35/月)。
3.2 API Key 管理与认证
获取 API Key:
- 登录 Comfy Cloud
- 点击右上角用户头像 → Account(账户)
- 选择 API Keys 选项卡
- 点击 Create API Key
- 复制生成的密钥并安全保存(关闭弹窗后将无法再次查看完整密钥)
认证方式:
所有 Cloud API 请求都需要在 HTTP 头中携带 API Key:
Authorization: Bearer cmy_xxxxxxxxxxxxxxxxxxxxx
API Key 最佳实践:
- ✅ 不同应用使用不同的 API Key(方便独立管理)
- ✅ API Key 存储在环境变量或密钥管理服务中
- ✅ 定期轮换 API Key
- ❌ 不要将 API Key 硬编码在代码中
- ❌ 不要将 API Key 提交到 Git 仓库
3.3 Cloud 专属端点说明
Cloud API 的基路径为 https://api.comfy.org,其核心端点如下:
| 方法 | 端点 | 功能 |
|---|---|---|
POST |
/workflows |
创建工作流 |
GET |
/workflows/{id} |
获取工作流详情 |
POST |
/workflows/{id}/run |
运行工作流 |
GET |
/runs/{id} |
查询运行状态 |
GET |
/runs/{id}/output |
获取运行结果 |
POST |
/runs/batch |
批量运行工作流 |
GET |
/queue |
查看队列状态 |
GET |
/credits |
查询积分余额 |
与本地 API 的关键区别:
| 特性 | 本地 API | Cloud API |
|---|---|---|
| 工作流提交 | 直接 JSON body | 分两步:先创建 → 再运行 |
| 结果获取 | 从 /history 获取 |
从 /runs/{id}/output 获取 |
| 文件传输 | Base64 编码图片 | 返回文件 URL(CDN) |
| 任务 ID 格式 | 数字 prompt_id | UUID 格式的 run_id |
3.4 限制与配额
使用 Cloud API 时需注意以下限制:
| 限制类型 | 具体说明 |
|---|---|
| 速率限制 | 每分钟最多 60 次 API 请求 |
| 并发限制 | Creator: 3 个并发, Pro: 5 个并发 |
| 单次运行时长 | Creator: 最长 30 分钟, Pro: 最长 60 分钟 |
| 结果存储时长 | 生成结果保留 30 天 |
| 工作流数量 | 最多 100 个保存的工作流 |
| 积分消耗 | 根据运行时长和模型复杂度按积分计费 |
超出限制时:API 会返回
429 Too Many Requests(速率超限)或402 Payment Required(积分不足)状态码。建议在代码中处理这些错误码并实施重试策略。
四、实操案例全解析
4.1 案例一:使用 cURL 调用本地 API 提交文生图任务
适用场景:快速测试 API 连通性、排查 API 调用问题。
Step 1:准备工作流 JSON
先导出工作流的 API 格式 JSON。假设文件名为 t2i_workflow.json。
Step 2:使用 cURL 提交任务
# 提交工作流到本地 API
curl -X POST http://127.0.0.1:8188/prompt \
-H "Content-Type: application/json" \
-d @t2i_workflow.json
成功响应示例:
{
"prompt_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"number": 5,
"node_errors": {}
}
prompt_id:任务的唯一 ID,用于后续查询状态和结果number:队列中的位置编号node_errors:节点错误信息(为空表示无错误)
Step 3:查询任务状态
# 查询任务历史(需要替换 prompt_id)
curl http://127.0.0.1:8188/history/a1b2c3d4-e5f6-7890-abcd-ef1234567890
Step 4:获取生成结果
当任务完成后,/history 返回的数据中包含 outputs 字段:
{
"a1b2c3d4-e5f6-7890-abcd-ef1234567890": {
"status": {
"status_str": "success",
"completed": true,
"messages": [...]
},
"outputs": {
"9": {
"images": [
{
"filename": "ComfyUI_00001_.png",
"subfolder": "",
"type": "output"
}
]
}
}
}
}
获取图片文件:
curl http://127.0.0.1:8188/view?filename=ComfyUI_00001_.png&type=output \
-o output.png
完整的一站式 cURL 示例脚本:
#!/bin/bash
# 一键提交并等待结果的简单脚本
SERVER="http://127.0.0.1:8188"
WORKFLOW_FILE="t2i_workflow.json"
# 1. 提交任务
echo "提交工作流..."
RESPONSE=$(curl -s -X POST "$SERVER/prompt" \
-H "Content-Type: application/json" \
-d @$WORKFLOW_FILE)
PROMPT_ID=$(echo $RESPONSE | python3 -c "import sys,json; print(json.load(sys.stdin)['prompt_id'])")
echo "任务 ID: $PROMPT_ID"
# 2. 轮询等待完成
echo "等待生成完成..."
while true; do
STATUS=$(curl -s "$SERVER/history/$PROMPT_ID" | python3 -c "
import sys, json
data = json.load(sys.stdin)
prompt = data.get('$PROMPT_ID', {})
completed = prompt.get('status', {}).get('completed', False)
print('completed' if completed else 'running')
")
if [ "$STATUS" = "completed" ]; then
echo "生成完成!"
break
fi
sleep 2
done
# 3. 获取图片
echo "下载生成结果..."
FILENAME=$(curl -s "$SERVER/history/$PROMPT_ID" | python3 -c "
import sys, json
data = json.load(sys.stdin)
outputs = data['$PROMPT_ID']['outputs']
# 取第一个图片输出节点
for node_id in outputs:
images = outputs[node_id].get('images', [])
if images:
print(images[0]['filename'])
break
")
curl -s "$SERVER/view?filename=$FILENAME&type=output" -o "output_$PROMPT_ID.png"
echo "图片已保存为: output_$PROMPT_ID.png"
4.2 案例二:Python 封装 —— 完整的本地 API 客户端
适用场景:在 Python 应用中集成 ComfyUI 生成能力。
"""
ComfyUI 本地 API 客户端封装
支持功能:提交任务、查询状态、等待完成、获取结果
"""
import json
import time
import requests
import base64
from io import BytesIO
from typing import Optional, Dict, Any
from pathlib import Path
class ComfyLocalClient:
"""ComfyUI 本地 API 客户端"""
def __init__(self, base_url: str = "http://127.0.0.1:8188"):
self.base_url = base_url.rstrip("/")
self.client_id = f"py-client-{int(time.time())}"
def submit_prompt(self, workflow_json: Dict) -> str:
"""
提交工作流任务
Args:
workflow_json: 工作流的 API 格式 JSON
Returns:
prompt_id: 任务 ID
"""
payload = {
"prompt": workflow_json,
"client_id": self.client_id
}
resp = requests.post(
f"{self.base_url}/prompt",
json=payload
)
resp.raise_for_status()
data = resp.json()
if data.get("node_errors"):
print(f"⚠️ 节点警告: {data['node_errors']}")
return data["prompt_id"]
def get_history(self, prompt_id: str) -> Optional[Dict]:
"""查询任务结果"""
resp = requests.get(f"{self.base_url}/history/{prompt_id}")
if resp.status_code == 200:
data = resp.json()
return data.get(prompt_id)
return None
def wait_for_completion(
self, prompt_id: str, poll_interval: float = 2.0, timeout: float = 300.0
) -> Dict:
"""
轮询等待任务完成
Args:
prompt_id: 任务 ID
poll_interval: 轮询间隔(秒)
timeout: 超时时间(秒)
Returns:
任务结果数据
Raises:
TimeoutError: 超时未完成
"""
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise TimeoutError(f"任务 {prompt_id} 超时")
history = self.get_history(prompt_id)
if history and history.get("status", {}).get("completed"):
return history
# 打印状态信息
status = history.get("status", {}) if history else {}
messages = status.get("messages", [])
if messages:
last_msg = messages[-1]
if len(last_msg) > 1:
print(f" [{last_msg[0]}] {last_msg[1].get('message', '')}")
time.sleep(poll_interval)
def get_output_images(
self, prompt_id: str, output_dir: Optional[str] = None
) -> list[Path]:
"""
获取并保存生成结果图片
Args:
prompt_id: 任务 ID
output_dir: 保存目录(默认当前目录)
Returns:
保存的文件路径列表
"""
history = self.get_history(prompt_id)
if not history:
return []
outputs = history.get("outputs", {})
saved_files = []
save_dir = Path(output_dir) if output_dir else Path.cwd()
save_dir.mkdir(parents=True, exist_ok=True)
for node_id, node_output in outputs.items():
for image_info in node_output.get("images", []):
filename = image_info["filename"]
subfolder = image_info.get("subfolder", "")
img_type = image_info.get("type", "output")
# 下载图片
resp = requests.get(
f"{self.base_url}/view",
params={
"filename": filename,
"subfolder": subfolder,
"type": img_type
}
)
if resp.status_code == 200:
filepath = save_dir / filename
filepath.write_bytes(resp.content)
saved_files.append(filepath)
print(f" ✅ 已保存: {filepath}")
return saved_files
# ========== 使用示例 ==========
if __name__ == "__main__":
# 1. 加载工作流 JSON(API 格式)
with open("t2i_workflow.json", "r") as f:
workflow = json.load(f)
# 2. 创建客户端
client = ComfyLocalClient()
# 3. 提交任务
print("🚀 提交工作流任务...")
prompt_id = client.submit_prompt(workflow)
print(f" 任务 ID: {prompt_id}")
# 4. 等待完成
print("⏳ 等待生成完成...")
result = client.wait_for_completion(prompt_id)
print(f" ✅ 生成完成!状态: {result['status']['status_str']}")
# 5. 获取结果图片
print("📥 下载生成结果...")
files = client.get_output_images(prompt_id, output_dir="./outputs")
print(f" 📁 共保存 {len(files)} 张图片")
4.3 案例三:使用 Cloud API 实现批量图像生成
适用场景:需要大规模生成图像,利用云端算力和并发能力。
"""
Comfy Cloud API 批量生成示例
需创作者版及以上订阅计划
"""
import json
import time
import requests
from typing import List, Dict
class ComfyCloudClient:
"""Comfy Cloud API 客户端"""
def __init__(self, api_key: str):
self.base_url = "https://api.comfy.org"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_workflow(self, workflow_json: Dict) -> str:
"""在工作流管理器中创建/注册工作流"""
resp = requests.post(
f"{self.base_url}/workflows",
headers=self.headers,
json={"workflow": workflow_json}
)
resp.raise_for_status()
return resp.json()["id"]
def run_workflow(self, workflow_id: str, overrides: Dict = None) -> str:
"""
运行工作流
Args:
workflow_id: 工作流 ID
overrides: 输入参数覆盖(如替换提示词)
Returns:
run_id: 运行任务 ID
"""
payload = {}
if overrides:
payload["overrides"] = overrides
resp = requests.post(
f"{self.base_url}/workflows/{workflow_id}/run",
headers=self.headers,
json=payload
)
resp.raise_for_status()
return resp.json()["id"]
def batch_run(
self, workflow_id: str, prompt_variations: List[str]
) -> List[str]:
"""
批量运行:对同一工作流使用不同提示词
Args:
workflow_id: 工作流 ID
prompt_variations: 提示词列表
Returns:
run_ids: 任务 ID 列表
"""
run_ids = []
for i, prompt in enumerate(prompt_variations):
print(f" 提交 [{i+1}/{len(prompt_variations)}]: {prompt[:50]}...")
run_id = self.run_workflow(workflow_id, {
"positive_prompt": prompt # 根据工作流结构调整
})
run_ids.append(run_id)
time.sleep(0.5) # 避免速率限制
return run_ids
def get_run_status(self, run_id: str) -> Dict:
"""查询运行状态"""
resp = requests.get(
f"{self.base_url}/runs/{run_id}",
headers=self.headers
)
resp.raise_for_status()
return resp.json()
def wait_for_runs(self, run_ids: List[str], poll_interval: int = 5):
"""等待所有批量任务完成"""
pending = set(run_ids)
completed = {}
while pending:
print(f"\n⏳ 等待中: {len(pending)}/{len(run_ids)} 个任务未完成")
for run_id in list(pending):
status = self.get_run_status(run_id)
state = status.get("state", "")
if state == "completed":
completed[run_id] = status
pending.remove(run_id)
print(f" ✅ {run_id}: 完成")
elif state == "failed":
print(f" ❌ {run_id}: 失败 - {status.get('error', '')}")
pending.remove(run_id)
# 其他状态(queued/running)继续等待
if pending:
time.sleep(poll_interval)
return completed
def download_results(self, run_statuses: Dict, output_dir: str = "./cloud_outputs"):
"""批量下载生成结果"""
import os
os.makedirs(output_dir, exist_ok=True)
for run_id, status in run_statuses.items():
resp = requests.get(
f"{self.base_url}/runs/{run_id}/output",
headers=self.headers
)
if resp.status_code == 200:
data = resp.json()
# Cloud API 返回图片 URL(CDN)
for output in data.get("outputs", []):
image_url = output.get("url")
if image_url:
img_resp = requests.get(image_url)
ext = image_url.split(".")[-1].split("?")[0]
filepath = os.path.join(
output_dir, f"{run_id[:8]}.{ext}"
)
with open(filepath, "wb") as f:
f.write(img_resp.content)
print(f" 📁 已保存: {filepath}")
# ========== 使用示例 ==========
if __name__ == "__main__":
API_KEY = os.environ.get("COMFY_CLOUD_API_KEY")
if not API_KEY:
raise ValueError("请设置 COMFY_CLOUD_API_KEY 环境变量")
client = ComfyCloudClient(API_KEY)
# 1. 创建工作流
with open("t2i_workflow.json") as f:
workflow = json.load(f)
wf_id = client.create_workflow(workflow)
print(f"✅ 工作流已创建: {wf_id}")
# 2. 批量提交多个提示词
prompts = [
"beautiful mountain landscape, sunset, 4k",
"cyberpunk city street at night, neon lights",
"cute cat sleeping on a windowsill, sunlight",
"ancient castle on a hill, foggy morning",
"underwater coral reef, colorful fish, sun rays",
]
print(f"🚀 批量提交 {len(prompts)} 个任务...")
run_ids = client.batch_run(wf_id, prompts)
# 3. 等待所有任务完成
completed = client.wait_for_runs(run_ids)
# 4. 下载结果
print("\n📥 下载生成结果...")
client.download_results(completed)
print(f"\n🎉 全部完成!共生成 {len(completed)} 张图片")
4.4 案例四:API 工作流的异常处理与重试机制
在生产环境中,API 调用不可能永远一帆风顺。网络抖动、服务器过载、积分不足等都可能导致失败。下面是一个健壮的异常处理与重试机制示例:
import time
import json
import requests
from functools import wraps
from typing import Callable, Any
class ComfyAPIError(Exception):
"""ComfyUI API 基础异常"""
pass
class RateLimitError(ComfyAPIError):
"""速率限制异常"""
pass
class InsufficientCreditsError(ComfyAPIError):
"""积分不足异常"""
pass
class TaskTimeoutError(ComfyAPIError):
"""任务超时异常"""
pass
def retry_on_failure(
max_retries: int = 3,
base_delay: float = 1.0,
backoff_factor: float = 2.0,
retryable_exceptions: tuple = (ConnectionError, TimeoutError, RateLimitError)
) -> Callable:
"""
重试装饰器:指数退避策略
Args:
max_retries: 最大重试次数
base_delay: 基础等待时间(秒)
backoff_factor: 退避因子
retryable_exceptions: 可重试的异常类型
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt < max_retries:
delay = base_delay * (backoff_factor ** attempt)
print(f"⚠️ 第 {attempt + 1} 次失败 ({e}), "
f"{delay:.1f} 秒后重试...")
time.sleep(delay)
else:
print(f"❌ 已重试 {max_retries} 次,放弃")
raise last_exception
return wrapper
return decorator
class RobustComfyClient:
"""健壮的 ComfyUI API 客户端(含重试机制)"""
def __init__(self, base_url: str, api_key: str = None, is_cloud: bool = False):
self.base_url = base_url.rstrip("/")
self.is_cloud = is_cloud
self.headers = {"Content-Type": "application/json"}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
@retry_on_failure(max_retries=3)
def safe_submit(self, workflow_json: dict) -> dict:
"""
带重试的安全提交
错误处理策略:
- 429 (Rate Limit) → 等待后重试
- 402 (Insufficient Credits) → 直接报错,不重试
- 5xx (Server Error) → 重试
- ConnectionError → 重试
"""
if self.is_cloud:
# Cloud API 需要先创建 workflow
resp = requests.post(
f"{self.base_url}/workflows",
headers=self.headers,
json={"workflow": workflow_json},
timeout=30
)
# 处理 Cloud 特有的错误
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 5))
print(f"⏳ 速率受限,等待 {retry_after} 秒...")
time.sleep(retry_after)
raise RateLimitError("Rate limit exceeded")
elif resp.status_code == 402:
raise InsufficientCreditsError("积分不足")
resp.raise_for_status()
wf_id = resp.json()["id"]
# 运行工作流
resp = requests.post(
f"{self.base_url}/workflows/{wf_id}/run",
headers=self.headers,
timeout=30
)
return resp.json()
else:
# 本地 API
resp = requests.post(
f"{self.base_url}/prompt",
headers=self.headers,
json={"prompt": workflow_json, "client_id": "robust-client"},
timeout=60
)
if resp.status_code == 429:
raise RateLimitError("Rate limit exceeded")
resp.raise_for_status()
return resp.json()
# 使用示例
def main():
client = RobustComfyClient(
base_url="http://127.0.0.1:8188",
is_cloud=False
)
with open("t2i_workflow.json") as f:
workflow = json.load(f)
try:
result = client.safe_submit(workflow)
print(f"✅ 提交成功!Prompt ID: {result.get('prompt_id')}")
except InsufficientCreditsError as e:
print(f"❌ 积分不足: {e}")
print(" 请前往 Comfy Cloud 后台充值积分")
except TaskTimeoutError as e:
print(f"❌ 任务超时: {e}")
print(" 请检查工作流是否过于复杂,或适当增加超时时间")
except ComfyAPIError as e:
print(f"❌ API 调用失败: {e}")
except Exception as e:
print(f"❌ 未知错误: {e}")
if __name__ == "__main__":
main()
五、API 调用排错与优化
5.1 常见错误码及解决方案
| HTTP 状态码 | 含义 | 常见原因 | 解决方法 |
|---|---|---|---|
| 400 Bad Request | 请求格式错误 | JSON 格式不正确、节点 ID 引用错误 | 验证 JSON 合法性;检查节点连接引用 ["node_id", index] |
| 401 Unauthorized | 认证失败 | API Key 无效或过期 | 检查 API Key 是否正确;重新生成 Key |
| 402 Payment Required | 需要付费 | Cloud 积分不足 | 前往 Cloud 后台查看积分余额并充值 |
| 404 Not Found | 资源不存在 | 工作流 ID 或任务 ID 错误 | 检查 ID 是否正确;确认资源未被删除 |
| 422 Unprocessable | 请求语义错误 | 节点类型不存在、参数类型不匹配 | 检查 class_type 拼写;确认参数名称正确 |
| 429 Too Many Requests | 请求频率过高 | 超过了速率限制 | 降低请求频率;实现指数退避重试 |
| 500 Internal Server Error | 服务器内部错误 | 模型加载失败、GPU 显存不足 | 检查服务器日志;确认模型路径正确 |
| 503 Service Unavailable | 服务不可用 | Cloud 资源暂时不足 | 等待后重试;联系 Comfy 支持 |
节点错误排查(最常遇到的问题):
当你提交的 JSON 中存在节点配置错误时,API 会返回 node_errors 字段:
{
"prompt_id": "abc123",
"number": 3,
"node_errors": {
"3": {
"class_type": "KSampler",
"errors": ["缺少必要输入: model"]
}
}
}
常见节点错误及修复:
| 错误信息 | 原因 | 修复方法 |
|---|---|---|
缺少必要输入: model |
KSampler 未连接 MODEL | 检查 "model": ["node_id", 0] 的引用是否正确 |
缺少必要输入: clip |
CLIP Text Encode 未连接 CLIP | 确认 CLIP 输出连接到对应的节点 |
节点类型不存在: xxx |
自定义节点未安装 | 安装对应的自定义节点;使用内置节点替代 |
参数类型不匹配 |
连接了错误类型的输出 | 检查上下游节点的输入输出类型 |
ckpt_name 不存在 |
模型文件未找到 | 确认 models/checkpoints 目录下有该文件 |
5.2 网络与连接问题排查
| 问题现象 | 可能原因 | 诊断方法 | 解决方案 |
|---|---|---|---|
Connection Refused |
ComfyUI 未启动 | curl http://127.0.0.1:8188 |
启动 ComfyUI |
Connection Timeout |
网络不通/防火墙拦截 | ping 目标IP |
检查防火墙规则;确认 IP 和端口 |
SSL Error |
HTTPS 证书问题 | curl -k 测试 |
配置正确证书;或临时使用 verify=False |
WebSocket 断开 |
长连接超时 | 检查代理设置 | 配置 WebSocket 心跳;调整代理超时设置 |
DNS 解析失败 |
域名无法解析 | nslookup api.comfy.org |
检查 DNS 配置;使用 IP 直连 |
诊断脚本:快速检测 API 连通性
#!/bin/bash
# ComfyUI API 连通性检测脚本
SERVER="http://127.0.0.1:8188"
echo "🔍 ComfyUI API 连通性检测"
echo "=========================="
# 1. 检测服务器是否可达
echo -n "1️⃣ 服务器可达性: "
if curl -s --connect-timeout 5 "$SERVER" > /dev/null 2>&1; then
echo "✅ 可达"
else
echo "❌ 不可达(请确认 ComfyUI 已启动)"
exit 1
fi
# 2. 检测 API 端点
echo -n "2️⃣ API 响应: "
API_RESP=$(curl -s --connect-timeout 5 "$SERVER/prompt" -X POST \
-H "Content-Type: application/json" \
-d '{"prompt": {}}' 2>&1)
if echo "$API_RESP" | grep -q "prompt_id\|error"; then
echo "✅ 响应正常"
else
echo "⚠️ 异常响应: $API_RESP"
fi
# 3. 检测队列状态
echo -n "3️⃣ 队列状态: "
QUEUE_RESP=$(curl -s "$SERVER/queue")
echo "✅ $(echo $QUEUE_RESP | python3 -c "
import sys, json
data = json.load(sys.stdin)
queue_running = len(data.get('queue_running', []))
queue_pending = len(data.get('queue_pending', [])
print(f'运行中: {queue_running}, 排队中: {queue_pending}')
)")
# 4. 检测系统状态
echo -n "4️⃣ 系统状态: "
curl -s "$SERVER/system_stats" | python3 -c "
import sys, json
data = json.load(sys.stdin)
print(f"系统正常")
" 2>/dev/null || echo "⚠️ 无法获取系统状态"
echo "=========================="
echo "✅ 检测完成"
5.3 API 调用效率优化策略
1. 连接池复用(Python Requests):
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 创建带连接池的会话
session = requests.Session()
# 设置连接池大小
adapter = HTTPAdapter(
pool_connections=10, # 连接池大小
pool_maxsize=20, # 最大连接数
max_retries=Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503]
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 使用 session 替代 requests 进行调用
resp = session.post(
"http://127.0.0.1:8188/prompt",
json={"prompt": workflow, "client_id": "optimized-client"}
)
2. 异步并发提交(asyncio + aiohttp):
import asyncio
import aiohttp
async def submit_prompt(session, workflow, client_id):
"""异步提交单个任务"""
async with session.post(
"http://127.0.0.1:8188/prompt",
json={"prompt": workflow, "client_id": client_id}
) as resp:
return await resp.json()
async def batch_submit(workflows: list, max_concurrent: int = 3):
"""批量并发提交(控制最大并发数)"""
connector = aiohttp.TCPConnector(limit=max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for i, wf in enumerate(workflows):
task = submit_prompt(session, wf, f"async-{i}")
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 使用
workflows = [load_workflow_with_prompt(p) for p in prompt_list]
results = asyncio.run(batch_submit(workflows, max_concurrent=3))
3. 缓存工作流 JSON(避免重复构造):
import hashlib
import json
from functools import lru_cache
class WorkflowCache:
"""工作流模板缓存"""
def __init__(self):
self._cache = {}
def get_or_build(self, template_path: str, overrides: dict) -> dict:
"""获取缓存的工作流,仅替换变动部分"""
cache_key = f"{template_path}:{hashlib.md5(json.dumps(overrides, sort_keys=True).encode()).hexdigest()}"
if cache_key not in self._cache:
with open(template_path) as f:
template = json.load(f)
# 应用参数覆盖
self._apply_overrides(template, overrides)
self._cache[cache_key] = template
return self._cache[cache_key]
def _apply_overrides(self, workflow: dict, overrides: dict):
"""递归覆盖工作流参数"""
for node_id, node in workflow.items():
if isinstance(node, dict) and "inputs" in node:
for key, value in overrides.items():
if key in node["inputs"]:
node["inputs"][key] = value
4. 优化建议总结:
| 优化方向 | 具体措施 | 预期效果 |
|---|---|---|
| 连接优化 | 使用连接池(Session) | 减少 TCP 握手开销,提升 30-50% 吞吐 |
| 并发控制 | asyncio + 信号量控制并发 | 充分利用带宽,避免过度负载 |
| 结果缓存 | 缓存工作流 JSON 模板 | 减少重复 JSON 构建,速度提升 80%+ |
| 超时设置 | 合理设置 connect/read timeout | 避免无效等待,提升系统健壮性 |
| 批量提交 | 减少 HTTP 请求次数 | 大幅提升吞吐量 |
| WebSocket | 使用 WS 替代轮询 | 实时感知状态,减少无效查询 |
六、本地 API vs Cloud API 全维对比
| 对比维度 | 🔧 本地 API | ☁️ Cloud API |
|---|---|---|
| 部署方式 | 启动 ComfyUI 即可 | 需单独注册订阅 |
| GPU 算力 | 自有 GPU | RTX 6000 Pro (96GB) |
| 请求延迟 | 低(局域网 < 1ms) | 中等(公网 20-100ms) |
| 并发上限 | 受本地 GPU 显存限制 | Creator: 3, Pro: 5 |
| 速率限制 | 无硬限制 | 60 次/分钟 |
| 鉴权方式 | --enable-auth |
API Key (Bearer) |
| HTTPS 支持 | 需自行配置 Nginx | 默认支持 |
| 可用性保证 | 自行维护 | 99.9% |
| 结果存储 | 本地文件系统 | 云端 30 天 |
| CDN 加速 | 无 | ✅ |
| 积分消耗 | 无 | 按运行时长计费 |
| 成本 | 免费 | 35−35−100/月 |
| 适用场景 | 开发调试、内部自动化 | 生产部署、对外服务 |
选型流程图:
是否需要对外提供服务?
├── 是 → 是否有预算订阅 Cloud?
│ ├── 是 → ☁️ Cloud API(推荐生产方案)
│ └── 否 → 🔧 本地 API + Nginx 反向代理 + HTTPS
└── 否 → 是否内网使用?
├── 是 → 🔧 本地 API(最简单方案)
└── 否 → 是否需要 96GB 大显存?
├── 是 → ☁️ Cloud API
└── 否 → 🔧 本地 API
七、总结与下一篇预告
本文核心要点回顾
通过本文的学习,你已经掌握了:
- ✅ 本地 API 的核心端点(
/prompt、/history、/queue)与启动配置 - ✅ Cloud API 的认证方式、专属端点、限制与配额
- ✅ 4 个实操案例——cURL 调用、Python 客户端封装、Cloud 批量生成、异常处理与重试
- ✅ API 调用排错——状态码解读、节点错误排查、网络问题诊断
- ✅ 性能优化策略——连接池、异步并发、工作流缓存
- ✅ 本地 vs Cloud API 的选型对比——根据场景选择最优方案
关键代码速查
# 本地 API:一键提交工作流
curl -X POST http://127.0.0.1:8188/prompt \
-H "Content-Type: application/json" \
-d @workflow.json
# Cloud API:带认证的运行工作流
curl -X POST https://api.comfy.org/workflows/{id}/run \
-H "Authorization: Bearer cmy_xxxx" \
-H "Content-Type: application/json"
# Python 客户端 —— 最少代码实现
import requests
resp = requests.post("http://127.0.0.1:8188/prompt",
json={"prompt": workflow, "client_id": "myapp"})
pid = resp.json()["prompt_id"]
下一篇预告
下一篇(博客 26)将聚焦 Cloud 专属功能详解——我们将深入探索 Cloud 平台独有的高级节点、批量生成与任务管理、数据存储与同步等核心能力,帮助你充分利用 Cloud 平台的优势,构建高效的 AI 图像生产管线。
如果你已经掌握了 API 集成的能力,那么下一篇将教你如何将这种能力与 Cloud 专属功能结合,发挥出更大的威力。敬请期待!
官方参考链接
- ComfyUI 本地 API 文档 — 官方本地 API 参考
- Comfy Cloud API 概览 — Cloud API 入门
- Comfy Cloud API 参考 — API 详细端点说明
- ComfyUI 启动参数文档 — 所有命令行启动参数
- Comfy Cloud 定价页面 — 订阅计划与 API 配额
- Comfy Cloud MCP 服务器 — AI 智能体连接 Cloud
- ComfyUI 获取帮助与故障排除 — 官方支持渠道与故障排查
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)