本文为系列第 25 篇,属于"高阶进阶章节"的第三个知识点详解。我们将深入讲解 ComfyUI 本地 API 与 Cloud API 的完整集成方案,包含端点说明、代码实操、异常处理与生产级最佳实践。


知识点详解——本地 API 与 Cloud API 集成(对接+实操+排错)


目录


一、引言:为什么需要 API 集成?

1.1 API 集成的核心价值

在 ComfyUI 的实际生产应用中,手动拖拽节点 → 点击运行 → 等待结果 的交互模式远远不够。当我们需要:

  • 将 AI 绘图能力集成到自己的 Web 应用或移动 App 中
  • 实现定时、自动化的批量图像生成
  • 对接 CI/CD 流水线,自动生成封面、海报
  • 构建 SaaS 平台,为用户提供 AI 绘图服务

这时候就需要 API(应用程序编程接口) 来打通 ComfyUI 与外部系统之间的桥梁。

ComfyUI 提供了两套 API 方案:

API 方案 定位 适用场景
本地 API ComfyUI 自带的 REST API,本地启动后自动可用 内部系统集成、本地自动化、开发测试
Cloud API Comfy Cloud 平台提供的托管 API 生产环境、对外服务、高可用需求

两套 API 的核心能力一致——提交工作流、查询状态、获取结果——但在部署方式、鉴权机制、性能限制等方面存在显著差异。

1.2 本地 API vs Cloud API:选型总览

对比维度 本地 API Cloud API
部署位置 本地服务器(127.0.0.1:8188 Comfy Cloud 云端
启动方式 启动 ComfyUI 时默认开启 需在 Cloud 后台创建 API Key
鉴权方式 无(默认)/ --enable-auth API Key(Bearer Token)
调用限制 无硬限制(受本地硬件制约) 有速率限制和并发限制
可用性 依赖本地服务器运行 99.9% 托管可用性
数据隐私 数据完全本地 数据经云端处理
成本 免费(仅电费) 按积分消耗 + 订阅费
适用场景 开发调试、内部自动化 生产部署、对外服务

选择建议:如果你的系统运行在同一个内网中,本地 API 是最简单、免费的选择;如果需要对外提供服务、需要高可用性保证,或想利用云端 GPU 算力,Cloud API 是更好的选择。


二、本地 API 详解

2.1 启动本地 API 服务

ComfyUI 默认启动时已包含 HTTP API 服务,访问 http://127.0.0.1:8188 即可使用。但要从外部程序调用 API,需要以下配置:

基础启动(本机访问):

python main.py

开启远程访问(局域网/公网):

python main.py --listen 0.0.0.0

--listen 0.0.0.0 监听所有网络接口,允许局域网内其他设备访问。默认端口为 8188,可通过 --port 参数修改:

python main.py --listen 0.0.0.0 --port 8288

指定 GPU 设备(多卡场景):

python main.py --gpu-only --gpu-device 0

全部常用启动参数速查:

参数 作用 示例
--listen [IP] 开启远程监听 --listen 0.0.0.0
--port N 指定端口 --port 8288
--gpu-only 仅使用 GPU 无参数
--cpu 仅使用 CPU(极慢) 无参数
--lowvram 低显存模式 无参数
--novram 不将模型加载到显存 无参数
--enable-auth 开启 API 鉴权 无参数
--output-directory DIR 指定输出目录 --output /data/outputs
--auto-launch 启动后自动打开浏览器 无参数

2.2 核心端点说明

ComfyUI 本地 API 基于 WebSocket + HTTP 双通道设计。HTTP 负责提交任务和查询,WebSocket 负责实时推送状态更新。

HTTP 端点(核心):

方法 端点 功能 请求体 / 参数
POST /prompt 提交工作流任务 JSON:{ "prompt": {...}, "client_id": "..." }
GET /history/{prompt_id} 查询单个任务结果 prompt_id 路径参数
GET /history 查询全部任务历史
GET /queue 查看当前队列状态
POST /queue 取消队列中的任务 JSON:{ "prompt_id": "..." , "delete": true }
GET /object_info/{node_class} 查询节点信息 node_class 可选
GET /embeddings 获取可用 embeddings 列表
GET /extensions 获取已安装扩展列表
GET /providers 查看可用的 providers
GET /system_stats 系统状态统计

WebSocket 端点:

端点 功能
ws://127.0.0.1:8188/ws?client_id={client_id} 实时接收任务状态推送

工作流提交格式(POST /prompt 的请求体结构):

{
  "prompt": {
    "3": {
      "class_type": "KSampler",
      "inputs": {
        "seed": 156680208700286,
        "steps": 25,
        "cfg": 7,
        "sampler_name": "dpmpp_2m",
        "scheduler": "karras",
        "denoise": 1,
        "model": ["4", 0],
        "positive": ["6", 0],
        "negative": ["7", 0],
        "latent_image": ["5", 0]
      }
    },
    "4": {
      "class_type": "CheckpointLoaderSimple",
      "inputs": {
        "ckpt_name": "dreamshaper_8.safetensors"
      }
    },
    "5": {
      "class_type": "EmptyLatentImage",
      "inputs": {
        "width": 512,
        "height": 512,
        "batch_size": 1
      }
    },
    "6": {
      "class_type": "CLIPTextEncode",
      "inputs": {
        "text": "beautiful landscape, sunshine, mountains",
        "clip": ["4", 1]
      }
    },
    "7": {
      "class_type": "CLIPTextEncode",
      "inputs": {
        "text": "blurry, low quality",
        "clip": ["4", 1]
      }
    },
    "8": {
      "class_type": "VAEDecode",
      "inputs": {
        "samples": ["3", 0],
        "vae": ["4", 2]
      }
    },
    "9": {
      "class_type": "SaveImage",
      "inputs": {
        "filename_prefix": "ComfyUI",
        "images": ["8", 0]
      }
    }
  },
  "client_id": "my-app-client-001"
}

关键理解prompt 字段内的 JSON 结构就是 ComfyUI 工作流的序列化表示。每个节点以数字 ID 为键,包含 class_type(节点类型)和 inputs(输入参数,其中的 ["node_id", output_index] 格式表示连接关系)。这实际上就是 ComfyUI 工作流 JSON 文件(workflow_api 格式)的完整结构。

如何获取工作流的 API JSON 格式?

在 ComfyUI 界面中:

  1. 完成工作流搭建后,点击右上角 Workflows(工作流)→ Save(保存)(API 格式)
  2. 或者点击 Save (API Format) 按钮直接保存为 API 调用可用的 JSON 文件
  3. 也可以用 "Save Image" 节点生成的图片(包含全量 metadata),通过脚本提取

推荐做法:先在 ComfyUI 界面中搭建并调试好工作流,确认能正确生成后,再导出为 API 格式用于代码调用。永远不要在 API 调用中第一次运行一个未经测试的工作流。

2.3 API 鉴权与安全配置

默认情况下,本地 API 没有任何鉴权——只要网络可达,任何人都可以提交任务。在生产环境中这显然不安全。

开启基本鉴权:

python main.py --listen 0.0.0.0 --enable-auth

启动后,需要在浏览器首次访问时设置用户名和密码。之后每次 API 调用都需要携带 Authorization 头。

使用 Nginx 反向代理 + HTTPS(推荐的生产方案):

server {
    listen 443 ssl;
    server_name comfyui.example.com;

    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;

    # 基本认证
    auth_basic "ComfyUI API";
    auth_basic_user_file /etc/nginx/.htpasswd;

    location / {
        proxy_pass http://127.0.0.1:8188;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_read_timeout 300s;
    }
}

安全建议总结:

场景 推荐安全措施
仅本机使用 默认无鉴权即可(127.0.0.1 限制)
局域网使用(可信环境) 防火墙限制端口 + --listen 指定内网 IP
公网访问 必须 HTTPS + 鉴权(推荐 Nginx 反向代理)
生产环境对外服务 使用 Cloud API(托管安全)

三、Cloud API 详解

3.1 Cloud API 的差异化能力

Cloud API 与本地 API 相比,除了部署位置不同,还有以下差异化能力

能力 说明
托管 GPU 算力 无需自备 GPU,使用 RTX 6000 Pro(96GB 显存)
高可用性 99.9% 服务可用性,自动故障恢复
并发执行 支持最多 5 个任务同时运行(视订阅计划)
全球 CDN 生成结果通过 CDN 加速分发
自动缩放 队列积压时自动增加计算资源
托管存储 工作流、模型、结果均存储在云端

订阅计划与 API 配额:

计划 月费 API 并发上限 积分/月 单次最长运行
免费版 $0 ❌ 无 API 访问 400 10 分钟
标准版 $20 ❌ 无 API 访问 4,200 30 分钟
创作者版 $35 ✅ 3 个并发 7,400 30 分钟
专业版 $100 ✅ 5 个并发 21,100 60 分钟

注意:免费版和标准版不提供 API 访问。如果需要 API 集成,至少需要创作者版($35/月)。

3.2 API Key 管理与认证

获取 API Key:

  1. 登录 Comfy Cloud
  2. 点击右上角用户头像 → Account(账户)
  3. 选择 API Keys 选项卡
  4. 点击 Create API Key
  5. 复制生成的密钥并安全保存(关闭弹窗后将无法再次查看完整密钥

认证方式:

所有 Cloud API 请求都需要在 HTTP 头中携带 API Key:

Authorization: Bearer cmy_xxxxxxxxxxxxxxxxxxxxx

API Key 最佳实践:

  • ✅ 不同应用使用不同的 API Key(方便独立管理)
  • ✅ API Key 存储在环境变量或密钥管理服务中
  • ✅ 定期轮换 API Key
  • ❌ 不要将 API Key 硬编码在代码中
  • ❌ 不要将 API Key 提交到 Git 仓库

3.3 Cloud 专属端点说明

Cloud API 的基路径为 https://api.comfy.org,其核心端点如下:

方法 端点 功能
POST /workflows 创建工作流
GET /workflows/{id} 获取工作流详情
POST /workflows/{id}/run 运行工作流
GET /runs/{id} 查询运行状态
GET /runs/{id}/output 获取运行结果
POST /runs/batch 批量运行工作流
GET /queue 查看队列状态
GET /credits 查询积分余额

与本地 API 的关键区别:

特性 本地 API Cloud API
工作流提交 直接 JSON body 分两步:先创建 → 再运行
结果获取 从 /history 获取 从 /runs/{id}/output 获取
文件传输 Base64 编码图片 返回文件 URL(CDN)
任务 ID 格式 数字 prompt_id UUID 格式的 run_id

3.4 限制与配额

使用 Cloud API 时需注意以下限制:

限制类型 具体说明
速率限制 每分钟最多 60 次 API 请求
并发限制 Creator: 3 个并发, Pro: 5 个并发
单次运行时长 Creator: 最长 30 分钟, Pro: 最长 60 分钟
结果存储时长 生成结果保留 30 天
工作流数量 最多 100 个保存的工作流
积分消耗 根据运行时长和模型复杂度按积分计费

超出限制时:API 会返回 429 Too Many Requests(速率超限)或 402 Payment Required(积分不足)状态码。建议在代码中处理这些错误码并实施重试策略。


四、实操案例全解析

4.1 案例一:使用 cURL 调用本地 API 提交文生图任务

适用场景:快速测试 API 连通性、排查 API 调用问题。

Step 1:准备工作流 JSON

先导出工作流的 API 格式 JSON。假设文件名为 t2i_workflow.json

Step 2:使用 cURL 提交任务

# 提交工作流到本地 API
curl -X POST http://127.0.0.1:8188/prompt \
  -H "Content-Type: application/json" \
  -d @t2i_workflow.json

成功响应示例:

{
  "prompt_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "number": 5,
  "node_errors": {}
}
  • prompt_id:任务的唯一 ID,用于后续查询状态和结果
  • number:队列中的位置编号
  • node_errors:节点错误信息(为空表示无错误)

Step 3:查询任务状态

# 查询任务历史(需要替换 prompt_id)
curl http://127.0.0.1:8188/history/a1b2c3d4-e5f6-7890-abcd-ef1234567890

Step 4:获取生成结果

当任务完成后,/history 返回的数据中包含 outputs 字段:

{
  "a1b2c3d4-e5f6-7890-abcd-ef1234567890": {
    "status": {
      "status_str": "success",
      "completed": true,
      "messages": [...]
    },
    "outputs": {
      "9": {
        "images": [
          {
            "filename": "ComfyUI_00001_.png",
            "subfolder": "",
            "type": "output"
          }
        ]
      }
    }
  }
}

获取图片文件:

curl http://127.0.0.1:8188/view?filename=ComfyUI_00001_.png&type=output \
  -o output.png

完整的一站式 cURL 示例脚本:

#!/bin/bash
# 一键提交并等待结果的简单脚本

SERVER="http://127.0.0.1:8188"
WORKFLOW_FILE="t2i_workflow.json"

# 1. 提交任务
echo "提交工作流..."
RESPONSE=$(curl -s -X POST "$SERVER/prompt" \
  -H "Content-Type: application/json" \
  -d @$WORKFLOW_FILE)

PROMPT_ID=$(echo $RESPONSE | python3 -c "import sys,json; print(json.load(sys.stdin)['prompt_id'])")
echo "任务 ID: $PROMPT_ID"

# 2. 轮询等待完成
echo "等待生成完成..."
while true; do
  STATUS=$(curl -s "$SERVER/history/$PROMPT_ID" | python3 -c "
import sys, json
data = json.load(sys.stdin)
prompt = data.get('$PROMPT_ID', {})
completed = prompt.get('status', {}).get('completed', False)
print('completed' if completed else 'running')
")
  
  if [ "$STATUS" = "completed" ]; then
    echo "生成完成!"
    break
  fi
  sleep 2
done

# 3. 获取图片
echo "下载生成结果..."
FILENAME=$(curl -s "$SERVER/history/$PROMPT_ID" | python3 -c "
import sys, json
data = json.load(sys.stdin)
outputs = data['$PROMPT_ID']['outputs']
# 取第一个图片输出节点
for node_id in outputs:
    images = outputs[node_id].get('images', [])
    if images:
        print(images[0]['filename'])
        break
")

curl -s "$SERVER/view?filename=$FILENAME&type=output" -o "output_$PROMPT_ID.png"
echo "图片已保存为: output_$PROMPT_ID.png"

4.2 案例二:Python 封装 —— 完整的本地 API 客户端

适用场景:在 Python 应用中集成 ComfyUI 生成能力。

"""
ComfyUI 本地 API 客户端封装
支持功能:提交任务、查询状态、等待完成、获取结果
"""
import json
import time
import requests
import base64
from io import BytesIO
from typing import Optional, Dict, Any
from pathlib import Path


class ComfyLocalClient:
    """ComfyUI 本地 API 客户端"""

    def __init__(self, base_url: str = "http://127.0.0.1:8188"):
        self.base_url = base_url.rstrip("/")
        self.client_id = f"py-client-{int(time.time())}"

    def submit_prompt(self, workflow_json: Dict) -> str:
        """
        提交工作流任务

        Args:
            workflow_json: 工作流的 API 格式 JSON

        Returns:
            prompt_id: 任务 ID
        """
        payload = {
            "prompt": workflow_json,
            "client_id": self.client_id
        }
        resp = requests.post(
            f"{self.base_url}/prompt",
            json=payload
        )
        resp.raise_for_status()
        data = resp.json()
        
        if data.get("node_errors"):
            print(f"⚠️ 节点警告: {data['node_errors']}")
        
        return data["prompt_id"]

    def get_history(self, prompt_id: str) -> Optional[Dict]:
        """查询任务结果"""
        resp = requests.get(f"{self.base_url}/history/{prompt_id}")
        if resp.status_code == 200:
            data = resp.json()
            return data.get(prompt_id)
        return None

    def wait_for_completion(
        self, prompt_id: str, poll_interval: float = 2.0, timeout: float = 300.0
    ) -> Dict:
        """
        轮询等待任务完成

        Args:
            prompt_id: 任务 ID
            poll_interval: 轮询间隔(秒)
            timeout: 超时时间(秒)

        Returns:
            任务结果数据

        Raises:
            TimeoutError: 超时未完成
        """
        start_time = time.time()
        while True:
            if time.time() - start_time > timeout:
                raise TimeoutError(f"任务 {prompt_id} 超时")

            history = self.get_history(prompt_id)
            if history and history.get("status", {}).get("completed"):
                return history
            
            # 打印状态信息
            status = history.get("status", {}) if history else {}
            messages = status.get("messages", [])
            if messages:
                last_msg = messages[-1]
                if len(last_msg) > 1:
                    print(f"  [{last_msg[0]}] {last_msg[1].get('message', '')}")

            time.sleep(poll_interval)

    def get_output_images(
        self, prompt_id: str, output_dir: Optional[str] = None
    ) -> list[Path]:
        """
        获取并保存生成结果图片

        Args:
            prompt_id: 任务 ID
            output_dir: 保存目录(默认当前目录)

        Returns:
            保存的文件路径列表
        """
        history = self.get_history(prompt_id)
        if not history:
            return []

        outputs = history.get("outputs", {})
        saved_files = []
        save_dir = Path(output_dir) if output_dir else Path.cwd()
        save_dir.mkdir(parents=True, exist_ok=True)

        for node_id, node_output in outputs.items():
            for image_info in node_output.get("images", []):
                filename = image_info["filename"]
                subfolder = image_info.get("subfolder", "")
                img_type = image_info.get("type", "output")

                # 下载图片
                resp = requests.get(
                    f"{self.base_url}/view",
                    params={
                        "filename": filename,
                        "subfolder": subfolder,
                        "type": img_type
                    }
                )
                if resp.status_code == 200:
                    filepath = save_dir / filename
                    filepath.write_bytes(resp.content)
                    saved_files.append(filepath)
                    print(f"  ✅ 已保存: {filepath}")

        return saved_files


# ========== 使用示例 ==========

if __name__ == "__main__":
    # 1. 加载工作流 JSON(API 格式)
    with open("t2i_workflow.json", "r") as f:
        workflow = json.load(f)

    # 2. 创建客户端
    client = ComfyLocalClient()

    # 3. 提交任务
    print("🚀 提交工作流任务...")
    prompt_id = client.submit_prompt(workflow)
    print(f"   任务 ID: {prompt_id}")

    # 4. 等待完成
    print("⏳ 等待生成完成...")
    result = client.wait_for_completion(prompt_id)
    print(f"   ✅ 生成完成!状态: {result['status']['status_str']}")

    # 5. 获取结果图片
    print("📥 下载生成结果...")
    files = client.get_output_images(prompt_id, output_dir="./outputs")
    print(f"   📁 共保存 {len(files)} 张图片")

4.3 案例三:使用 Cloud API 实现批量图像生成

适用场景:需要大规模生成图像,利用云端算力和并发能力。

"""
Comfy Cloud API 批量生成示例
需创作者版及以上订阅计划
"""
import json
import time
import requests
from typing import List, Dict


class ComfyCloudClient:
    """Comfy Cloud API 客户端"""

    def __init__(self, api_key: str):
        self.base_url = "https://api.comfy.org"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def create_workflow(self, workflow_json: Dict) -> str:
        """在工作流管理器中创建/注册工作流"""
        resp = requests.post(
            f"{self.base_url}/workflows",
            headers=self.headers,
            json={"workflow": workflow_json}
        )
        resp.raise_for_status()
        return resp.json()["id"]

    def run_workflow(self, workflow_id: str, overrides: Dict = None) -> str:
        """
        运行工作流

        Args:
            workflow_id: 工作流 ID
            overrides: 输入参数覆盖(如替换提示词)

        Returns:
            run_id: 运行任务 ID
        """
        payload = {}
        if overrides:
            payload["overrides"] = overrides
        
        resp = requests.post(
            f"{self.base_url}/workflows/{workflow_id}/run",
            headers=self.headers,
            json=payload
        )
        resp.raise_for_status()
        return resp.json()["id"]

    def batch_run(
        self, workflow_id: str, prompt_variations: List[str]
    ) -> List[str]:
        """
        批量运行:对同一工作流使用不同提示词

        Args:
            workflow_id: 工作流 ID
            prompt_variations: 提示词列表

        Returns:
            run_ids: 任务 ID 列表
        """
        run_ids = []
        for i, prompt in enumerate(prompt_variations):
            print(f"  提交 [{i+1}/{len(prompt_variations)}]: {prompt[:50]}...")
            run_id = self.run_workflow(workflow_id, {
                "positive_prompt": prompt  # 根据工作流结构调整
            })
            run_ids.append(run_id)
            time.sleep(0.5)  # 避免速率限制
        return run_ids

    def get_run_status(self, run_id: str) -> Dict:
        """查询运行状态"""
        resp = requests.get(
            f"{self.base_url}/runs/{run_id}",
            headers=self.headers
        )
        resp.raise_for_status()
        return resp.json()

    def wait_for_runs(self, run_ids: List[str], poll_interval: int = 5):
        """等待所有批量任务完成"""
        pending = set(run_ids)
        completed = {}

        while pending:
            print(f"\n⏳ 等待中: {len(pending)}/{len(run_ids)} 个任务未完成")
            for run_id in list(pending):
                status = self.get_run_status(run_id)
                state = status.get("state", "")
                
                if state == "completed":
                    completed[run_id] = status
                    pending.remove(run_id)
                    print(f"  ✅ {run_id}: 完成")
                elif state == "failed":
                    print(f"  ❌ {run_id}: 失败 - {status.get('error', '')}")
                    pending.remove(run_id)
                # 其他状态(queued/running)继续等待
            
            if pending:
                time.sleep(poll_interval)

        return completed

    def download_results(self, run_statuses: Dict, output_dir: str = "./cloud_outputs"):
        """批量下载生成结果"""
        import os
        os.makedirs(output_dir, exist_ok=True)
        
        for run_id, status in run_statuses.items():
            resp = requests.get(
                f"{self.base_url}/runs/{run_id}/output",
                headers=self.headers
            )
            if resp.status_code == 200:
                data = resp.json()
                # Cloud API 返回图片 URL(CDN)
                for output in data.get("outputs", []):
                    image_url = output.get("url")
                    if image_url:
                        img_resp = requests.get(image_url)
                        ext = image_url.split(".")[-1].split("?")[0]
                        filepath = os.path.join(
                            output_dir, f"{run_id[:8]}.{ext}"
                        )
                        with open(filepath, "wb") as f:
                            f.write(img_resp.content)
                        print(f"  📁 已保存: {filepath}")


# ========== 使用示例 ==========

if __name__ == "__main__":
    API_KEY = os.environ.get("COMFY_CLOUD_API_KEY")
    if not API_KEY:
        raise ValueError("请设置 COMFY_CLOUD_API_KEY 环境变量")

    client = ComfyCloudClient(API_KEY)

    # 1. 创建工作流
    with open("t2i_workflow.json") as f:
        workflow = json.load(f)
    wf_id = client.create_workflow(workflow)
    print(f"✅ 工作流已创建: {wf_id}")

    # 2. 批量提交多个提示词
    prompts = [
        "beautiful mountain landscape, sunset, 4k",
        "cyberpunk city street at night, neon lights",
        "cute cat sleeping on a windowsill, sunlight",
        "ancient castle on a hill, foggy morning",
        "underwater coral reef, colorful fish, sun rays",
    ]

    print(f"🚀 批量提交 {len(prompts)} 个任务...")
    run_ids = client.batch_run(wf_id, prompts)

    # 3. 等待所有任务完成
    completed = client.wait_for_runs(run_ids)

    # 4. 下载结果
    print("\n📥 下载生成结果...")
    client.download_results(completed)
    print(f"\n🎉 全部完成!共生成 {len(completed)} 张图片")

4.4 案例四:API 工作流的异常处理与重试机制

在生产环境中,API 调用不可能永远一帆风顺。网络抖动、服务器过载、积分不足等都可能导致失败。下面是一个健壮的异常处理与重试机制示例:

import time
import json
import requests
from functools import wraps
from typing import Callable, Any


class ComfyAPIError(Exception):
    """ComfyUI API 基础异常"""
    pass

class RateLimitError(ComfyAPIError):
    """速率限制异常"""
    pass

class InsufficientCreditsError(ComfyAPIError):
    """积分不足异常"""
    pass

class TaskTimeoutError(ComfyAPIError):
    """任务超时异常"""
    pass


def retry_on_failure(
    max_retries: int = 3,
    base_delay: float = 1.0,
    backoff_factor: float = 2.0,
    retryable_exceptions: tuple = (ConnectionError, TimeoutError, RateLimitError)
) -> Callable:
    """
    重试装饰器:指数退避策略
    
    Args:
        max_retries: 最大重试次数
        base_delay: 基础等待时间(秒)
        backoff_factor: 退避因子
        retryable_exceptions: 可重试的异常类型
    """
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except retryable_exceptions as e:
                    last_exception = e
                    if attempt < max_retries:
                        delay = base_delay * (backoff_factor ** attempt)
                        print(f"⚠️ 第 {attempt + 1} 次失败 ({e}), "
                              f"{delay:.1f} 秒后重试...")
                        time.sleep(delay)
                    else:
                        print(f"❌ 已重试 {max_retries} 次,放弃")
            raise last_exception
        return wrapper
    return decorator


class RobustComfyClient:
    """健壮的 ComfyUI API 客户端(含重试机制)"""

    def __init__(self, base_url: str, api_key: str = None, is_cloud: bool = False):
        self.base_url = base_url.rstrip("/")
        self.is_cloud = is_cloud
        self.headers = {"Content-Type": "application/json"}
        if api_key:
            self.headers["Authorization"] = f"Bearer {api_key}"

    @retry_on_failure(max_retries=3)
    def safe_submit(self, workflow_json: dict) -> dict:
        """
        带重试的安全提交
        
        错误处理策略:
        - 429 (Rate Limit) → 等待后重试
        - 402 (Insufficient Credits) → 直接报错,不重试
        - 5xx (Server Error) → 重试
        - ConnectionError → 重试
        """
        if self.is_cloud:
            # Cloud API 需要先创建 workflow
            resp = requests.post(
                f"{self.base_url}/workflows",
                headers=self.headers,
                json={"workflow": workflow_json},
                timeout=30
            )
            # 处理 Cloud 特有的错误
            if resp.status_code == 429:
                retry_after = int(resp.headers.get("Retry-After", 5))
                print(f"⏳ 速率受限,等待 {retry_after} 秒...")
                time.sleep(retry_after)
                raise RateLimitError("Rate limit exceeded")
            elif resp.status_code == 402:
                raise InsufficientCreditsError("积分不足")
            resp.raise_for_status()
            wf_id = resp.json()["id"]
            
            # 运行工作流
            resp = requests.post(
                f"{self.base_url}/workflows/{wf_id}/run",
                headers=self.headers,
                timeout=30
            )
            return resp.json()
        else:
            # 本地 API
            resp = requests.post(
                f"{self.base_url}/prompt",
                headers=self.headers,
                json={"prompt": workflow_json, "client_id": "robust-client"},
                timeout=60
            )
            if resp.status_code == 429:
                raise RateLimitError("Rate limit exceeded")
            resp.raise_for_status()
            return resp.json()


# 使用示例
def main():
    client = RobustComfyClient(
        base_url="http://127.0.0.1:8188",
        is_cloud=False
    )
    
    with open("t2i_workflow.json") as f:
        workflow = json.load(f)
    
    try:
        result = client.safe_submit(workflow)
        print(f"✅ 提交成功!Prompt ID: {result.get('prompt_id')}")
    except InsufficientCreditsError as e:
        print(f"❌ 积分不足: {e}")
        print("  请前往 Comfy Cloud 后台充值积分")
    except TaskTimeoutError as e:
        print(f"❌ 任务超时: {e}")
        print("  请检查工作流是否过于复杂,或适当增加超时时间")
    except ComfyAPIError as e:
        print(f"❌ API 调用失败: {e}")
    except Exception as e:
        print(f"❌ 未知错误: {e}")


if __name__ == "__main__":
    main()

五、API 调用排错与优化

5.1 常见错误码及解决方案

HTTP 状态码 含义 常见原因 解决方法
400 Bad Request 请求格式错误 JSON 格式不正确、节点 ID 引用错误 验证 JSON 合法性;检查节点连接引用 ["node_id", index]
401 Unauthorized 认证失败 API Key 无效或过期 检查 API Key 是否正确;重新生成 Key
402 Payment Required 需要付费 Cloud 积分不足 前往 Cloud 后台查看积分余额并充值
404 Not Found 资源不存在 工作流 ID 或任务 ID 错误 检查 ID 是否正确;确认资源未被删除
422 Unprocessable 请求语义错误 节点类型不存在、参数类型不匹配 检查 class_type 拼写;确认参数名称正确
429 Too Many Requests 请求频率过高 超过了速率限制 降低请求频率;实现指数退避重试
500 Internal Server Error 服务器内部错误 模型加载失败、GPU 显存不足 检查服务器日志;确认模型路径正确
503 Service Unavailable 服务不可用 Cloud 资源暂时不足 等待后重试;联系 Comfy 支持

节点错误排查(最常遇到的问题):

当你提交的 JSON 中存在节点配置错误时,API 会返回 node_errors 字段:

{
  "prompt_id": "abc123",
  "number": 3,
  "node_errors": {
    "3": {
      "class_type": "KSampler",
      "errors": ["缺少必要输入: model"]
    }
  }
}

常见节点错误及修复:

错误信息 原因 修复方法
缺少必要输入: model KSampler 未连接 MODEL 检查 "model": ["node_id", 0] 的引用是否正确
缺少必要输入: clip CLIP Text Encode 未连接 CLIP 确认 CLIP 输出连接到对应的节点
节点类型不存在: xxx 自定义节点未安装 安装对应的自定义节点;使用内置节点替代
参数类型不匹配 连接了错误类型的输出 检查上下游节点的输入输出类型
ckpt_name 不存在 模型文件未找到 确认 models/checkpoints 目录下有该文件

5.2 网络与连接问题排查

问题现象 可能原因 诊断方法 解决方案
Connection Refused ComfyUI 未启动 curl http://127.0.0.1:8188 启动 ComfyUI
Connection Timeout 网络不通/防火墙拦截 ping 目标IP 检查防火墙规则;确认 IP 和端口
SSL Error HTTPS 证书问题 curl -k 测试 配置正确证书;或临时使用 verify=False
WebSocket 断开 长连接超时 检查代理设置 配置 WebSocket 心跳;调整代理超时设置
DNS 解析失败 域名无法解析 nslookup api.comfy.org 检查 DNS 配置;使用 IP 直连

诊断脚本:快速检测 API 连通性

#!/bin/bash
# ComfyUI API 连通性检测脚本

SERVER="http://127.0.0.1:8188"

echo "🔍 ComfyUI API 连通性检测"
echo "=========================="

# 1. 检测服务器是否可达
echo -n "1️⃣  服务器可达性: "
if curl -s --connect-timeout 5 "$SERVER" > /dev/null 2>&1; then
    echo "✅ 可达"
else
    echo "❌ 不可达(请确认 ComfyUI 已启动)"
    exit 1
fi

# 2. 检测 API 端点
echo -n "2️⃣  API 响应: "
API_RESP=$(curl -s --connect-timeout 5 "$SERVER/prompt" -X POST \
  -H "Content-Type: application/json" \
  -d '{"prompt": {}}' 2>&1)
if echo "$API_RESP" | grep -q "prompt_id\|error"; then
    echo "✅ 响应正常"
else
    echo "⚠️  异常响应: $API_RESP"
fi

# 3. 检测队列状态
echo -n "3️⃣  队列状态: "
QUEUE_RESP=$(curl -s "$SERVER/queue")
echo "✅ $(echo $QUEUE_RESP | python3 -c "
import sys, json
data = json.load(sys.stdin)
queue_running = len(data.get('queue_running', []))
queue_pending = len(data.get('queue_pending', [])
print(f'运行中: {queue_running}, 排队中: {queue_pending}')
)")

# 4. 检测系统状态
echo -n "4️⃣  系统状态: "
curl -s "$SERVER/system_stats" | python3 -c "
import sys, json
data = json.load(sys.stdin)
print(f"系统正常")
" 2>/dev/null || echo "⚠️ 无法获取系统状态"

echo "=========================="
echo "✅ 检测完成"

5.3 API 调用效率优化策略

1. 连接池复用(Python Requests):

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# 创建带连接池的会话
session = requests.Session()

# 设置连接池大小
adapter = HTTPAdapter(
    pool_connections=10,   # 连接池大小
    pool_maxsize=20,      # 最大连接数
    max_retries=Retry(
        total=3,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503]
    )
)
session.mount("http://", adapter)
session.mount("https://", adapter)

# 使用 session 替代 requests 进行调用
resp = session.post(
    "http://127.0.0.1:8188/prompt",
    json={"prompt": workflow, "client_id": "optimized-client"}
)

2. 异步并发提交(asyncio + aiohttp):

import asyncio
import aiohttp

async def submit_prompt(session, workflow, client_id):
    """异步提交单个任务"""
    async with session.post(
        "http://127.0.0.1:8188/prompt",
        json={"prompt": workflow, "client_id": client_id}
    ) as resp:
        return await resp.json()

async def batch_submit(workflows: list, max_concurrent: int = 3):
    """批量并发提交(控制最大并发数)"""
    connector = aiohttp.TCPConnector(limit=max_concurrent)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = []
        for i, wf in enumerate(workflows):
            task = submit_prompt(session, wf, f"async-{i}")
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# 使用
workflows = [load_workflow_with_prompt(p) for p in prompt_list]
results = asyncio.run(batch_submit(workflows, max_concurrent=3))

3. 缓存工作流 JSON(避免重复构造):

import hashlib
import json
from functools import lru_cache

class WorkflowCache:
    """工作流模板缓存"""
    
    def __init__(self):
        self._cache = {}
    
    def get_or_build(self, template_path: str, overrides: dict) -> dict:
        """获取缓存的工作流,仅替换变动部分"""
        cache_key = f"{template_path}:{hashlib.md5(json.dumps(overrides, sort_keys=True).encode()).hexdigest()}"
        
        if cache_key not in self._cache:
            with open(template_path) as f:
                template = json.load(f)
            # 应用参数覆盖
            self._apply_overrides(template, overrides)
            self._cache[cache_key] = template
        
        return self._cache[cache_key]
    
    def _apply_overrides(self, workflow: dict, overrides: dict):
        """递归覆盖工作流参数"""
        for node_id, node in workflow.items():
            if isinstance(node, dict) and "inputs" in node:
                for key, value in overrides.items():
                    if key in node["inputs"]:
                        node["inputs"][key] = value

4. 优化建议总结:

优化方向 具体措施 预期效果
连接优化 使用连接池(Session) 减少 TCP 握手开销,提升 30-50% 吞吐
并发控制 asyncio + 信号量控制并发 充分利用带宽,避免过度负载
结果缓存 缓存工作流 JSON 模板 减少重复 JSON 构建,速度提升 80%+
超时设置 合理设置 connect/read timeout 避免无效等待,提升系统健壮性
批量提交 减少 HTTP 请求次数 大幅提升吞吐量
WebSocket 使用 WS 替代轮询 实时感知状态,减少无效查询

六、本地 API vs Cloud API 全维对比

对比维度 🔧 本地 API ☁️ Cloud API
部署方式 启动 ComfyUI 即可 需单独注册订阅
GPU 算力 自有 GPU RTX 6000 Pro (96GB)
请求延迟 低(局域网 < 1ms) 中等(公网 20-100ms)
并发上限 受本地 GPU 显存限制 Creator: 3, Pro: 5
速率限制 无硬限制 60 次/分钟
鉴权方式 --enable-auth API Key (Bearer)
HTTPS 支持 需自行配置 Nginx 默认支持
可用性保证 自行维护 99.9%
结果存储 本地文件系统 云端 30 天
CDN 加速
积分消耗 按运行时长计费
成本 免费 35−35−100/月
适用场景 开发调试、内部自动化 生产部署、对外服务

选型流程图:

是否需要对外提供服务?
├── 是 → 是否有预算订阅 Cloud?
│   ├── 是 → ☁️ Cloud API(推荐生产方案)
│   └── 否 → 🔧 本地 API + Nginx 反向代理 + HTTPS
└── 否 → 是否内网使用?
    ├── 是 → 🔧 本地 API(最简单方案)
    └── 否 → 是否需要 96GB 大显存?
        ├── 是 → ☁️ Cloud API
        └── 否 → 🔧 本地 API

七、总结与下一篇预告

本文核心要点回顾

通过本文的学习,你已经掌握了:

  1. ✅ 本地 API 的核心端点(/prompt/history/queue)与启动配置
  2. ✅ Cloud API 的认证方式、专属端点、限制与配额
  3. ✅ 4 个实操案例——cURL 调用、Python 客户端封装、Cloud 批量生成、异常处理与重试
  4. ✅ API 调用排错——状态码解读、节点错误排查、网络问题诊断
  5. ✅ 性能优化策略——连接池、异步并发、工作流缓存
  6. ✅ 本地 vs Cloud API 的选型对比——根据场景选择最优方案

关键代码速查

# 本地 API:一键提交工作流
curl -X POST http://127.0.0.1:8188/prompt \
  -H "Content-Type: application/json" \
  -d @workflow.json

# Cloud API:带认证的运行工作流
curl -X POST https://api.comfy.org/workflows/{id}/run \
  -H "Authorization: Bearer cmy_xxxx" \
  -H "Content-Type: application/json"
# Python 客户端 —— 最少代码实现
import requests
resp = requests.post("http://127.0.0.1:8188/prompt", 
    json={"prompt": workflow, "client_id": "myapp"})
pid = resp.json()["prompt_id"]

下一篇预告

下一篇(博客 26)将聚焦 Cloud 专属功能详解——我们将深入探索 Cloud 平台独有的高级节点、批量生成与任务管理、数据存储与同步等核心能力,帮助你充分利用 Cloud 平台的优势,构建高效的 AI 图像生产管线。

如果你已经掌握了 API 集成的能力,那么下一篇将教你如何将这种能力与 Cloud 专属功能结合,发挥出更大的威力。敬请期待!


官方参考链接

  1. ComfyUI 本地 API 文档 — 官方本地 API 参考
  2. Comfy Cloud API 概览 — Cloud API 入门
  3. Comfy Cloud API 参考 — API 详细端点说明
  4. ComfyUI 启动参数文档 — 所有命令行启动参数
  5. Comfy Cloud 定价页面 — 订阅计划与 API 配额
  6. Comfy Cloud MCP 服务器 — AI 智能体连接 Cloud
  7. ComfyUI 获取帮助与故障排除 — 官方支持渠道与故障排查
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐