Grasshopper是rhino上非常强大的一款插件,可以用它设计非常复杂的参数化晶格体建模等。
我期望能用AI驱动此插件的应用。

详细分析几种用OpenCode与Grasshopper建立通信和电池编程的方法。

一、技术方案总览

方案 通信方向 实时性 复杂度 适用场景

  1. Grasshopper Python电池 GH → OpenCode ★★★★☆ ★★☆☆☆ 内部脚本执行

  2. RhinoCommon API OpenCode → Rhino ★★★★☆ ★★★☆☆ 外部控制GH

  3. Grasshopper SDK OpenCode → GH电池 ★★★★☆ ★★★★★ 专业插件开发

  4. GH_CPython电池 双向通信 ★★★★★ ★★★★☆ 实时交互

  5. JSON序列化 文件交换 ★★☆☆☆ ★☆☆☆☆ 批处理/离线

  6. 自定义二进制协议 双向通信 ★★★★★ ★★★★★ 高性能需求

二、详细实施方案

方案1:通过GH Python电池(最简单)

1. 创建可执行外部代码的Python电池

gh_python_bridge.ghpython

“”“在Grasshopper中创建此Python电池”“”

import sys
import json
import subprocess
import tempfile
import clr
clr.AddReference(“Grasshopper”)
import Grasshopper as gh
import Rhino.Geometry as rg

接收外部代码

external_code = x # 从Grasshopper输入端获取

def execute_external_code(code_string):
“”“执行外部Python代码并返回结果”“”
# 创建临时文件
with tempfile.NamedTemporaryFile(mode=‘w’, suffix=‘.py’, delete=False) as f:
f.write(code_string)
temp_file = f.name

try:
    # 执行代码
    result = subprocess.run(
        [sys.executable, temp_file],
        capture_output=True,
        text=True,
        timeout=10
    )
    
    # 解析输出
    if result.returncode == 0:
        output = result.stdout.strip()
        # 可以解析为Grasshopper几何体
        return parse_output_to_geometry(output)
    else:
        return f"Error: {result.stderr}"
        
finally:
    import os
    os.unlink(temp_file)

def parse_output_to_geometry(output_str):
“”“将输出字符串转换为Grasshopper几何体”“”
try:
data = json.loads(output_str)
# 根据数据类型创建几何体
if data[‘type’] == ‘point’:
return rg.Point3d(*data[‘coordinates’])
elif data[‘type’] == ‘curve’:
points = [rg.Point3d(*p) for p in data[‘points’]]
return rg.Curve.CreateControlPointCurve(points, 3)
# … 其他类型
except:
return output_str # 返回原始字符串

执行外部代码

if external_code:
a = execute_external_code(str(external_code))

方案2:通过RhinoCommon API创建电池

// 使用RhinoCommon在OpenCode中创建电池
// 需要在Visual Studio中创建类库项目
using System;
using System.Collections.Generic;
using Rhino;
using Rhino.Geometry;
using Grasshopper;
using Grasshopper.Kernel;
using Grasshopper.Kernel.Types;
using Grasshopper.Kernel.Parameters;

namespace OpenCodeToGH
{
public class BatteryCreator
{
public void CreateAndConnectBatteries()
{
// 获取当前Grasshopper文档
var ghDoc = Instances.ActiveCanvas.Document;

        // 1. 创建点电池
        var pointComponent = CreateComponent(ghDoc, "Point", new Point3d(0, 0, 0));
        
        // 2. 创建圆电池
        var circleComponent = CreateComponent(ghDoc, "Circle", 10.0);
        
        // 3. 连接电池
        ConnectComponents(
            pointComponent, 0,  // 点电池的输出端口0
            circleComponent, 0   // 圆电池的输入端口0
        );
        
        // 4. 设置参数
        SetComponentNickname(circleComponent, "My Circle");
        
        // 5. 触发重新计算
        ghDoc.NewSolution(true);
    }
    
    private IGH_DocumentObject CreateComponent(
        GH_Document doc, 
        string componentName, 
        object defaultValue = null)
    {
        // 创建组件实例
        var obj = Grasshopper.Instances.ComponentServer.EmitObject(componentName);
        
        if (obj != null)
        {
            // 添加到文档
            doc.AddObject(obj, true);
            
            // 设置位置
            obj.Attributes.Pivot = new PointF(
                100 + (doc.ObjectCount * 50),
                100
            );
            
            // 设置默认值
            if (defaultValue != null)
            {
                SetComponentValue(obj, defaultValue);
            }
        }
        
        return obj;
    }
    
    private void ConnectComponents(
        IGH_DocumentObject source, int sourcePort,
        IGH_DocumentObject target, int targetPort)
    {
        var sourceParam = source.Params.Output[sourcePort];
        var targetParam = target.Params.Input[targetPort];
        
        // 创建连接
        var connection = new GH_Connection();
        connection.Add(sourceParam, targetParam);
        
        // 添加到文档
        Instances.ActiveCanvas.Document.Connections.Add(connection);
    }
}

}

方案3:通过GH_CPython实现实时通信

使用GH_CPython插件实现双向实时通信

https://www.food4rhino.com/en/app/ghcpython

“”"
安装步骤:

  1. 下载GH_CPython插件
  2. 安装到Grasshopper
  3. 配置ZeroMQ或WebSocket
    “”"

server.py - OpenCode端

import zmq
import json
from datetime import datetime

class GrasshopperBridge:
def init(self, port=5555):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind(f"tcp://*:{port}")
self.gh_components = {}

def listen(self):
    """监听Grasshopper请求"""
    print("等待Grasshopper连接...")
    
    while True:
        # 接收消息
        message = self.socket.recv_json()
        print(f"收到: {message}")
        
        # 处理指令
        response = self.process_command(message)
        
        # 发送响应
        self.socket.send_json(response)

def process_command(self, command):
    """处理来自Grasshopper的指令"""
    cmd_type = command.get("type")
    
    if cmd_type == "create_component":
        return self.create_component(command)
    elif cmd_type == "set_parameter":
        return self.set_parameter(command)
    elif cmd_type == "run_component":
        return self.run_component(command)
    elif cmd_type == "get_value":
        return self.get_value(command)
    else:
        return {"error": f"未知指令: {cmd_type}"}

def create_component(self, cmd):
    """创建新电池的指令"""
    component_id = f"comp_{datetime.now().timestamp()}"
    component_info = {
        "id": component_id,
        "name": cmd.get("name", "未命名"),
        "type": cmd.get("component_type"),
        "position": cmd.get("position", [0, 0]),
        "parameters": cmd.get("parameters", {})
    }
    
    self.gh_components[component_id] = component_info
    
    return {
        "status": "created",
        "component_id": component_id,
        "component_info": component_info
    }

def generate_gh_python_code(self, component_info):
    """生成对应GH Python电池的代码"""
    code_template = """

import Rhino.Geometry as rg
import Grasshopper as gh

自动生成的电池代码

{component_code}

输出结果

a = result
“”"

    # 根据组件类型生成不同代码
    comp_type = component_info["type"]
    
    if comp_type == "Point":
        x = component_info["parameters"].get("x", 0)
        y = component_info["parameters"].get("y", 0)
        z = component_info["parameters"].get("z", 0)
        
        code = f"result = rg.Point3d({x}, {y}, {z})"
    
    elif comp_type == "Circle":
        radius = component_info["parameters"].get("radius", 10)
        plane = component_info["parameters"].get("plane", "WorldXY")
        
        code = f"""

plane = rg.Plane.{plane}
result = rg.Circle(plane, {radius})
“”"

    # 更多组件类型...
    
    return code_template.format(component_code=code)

client.py - Grasshopper端 (GH_CPython电池)

“”"
在Grasshopper中创建GH_CPython电池,填入以下代码:
“”"

import zmq
import json

class GHClient:
def init(self, server_address=“tcp://localhost:5555”):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(server_address)
self.timeout = 5000 # 5秒超时

def send_command(self, command):
    """发送命令到OpenCode服务器"""
    try:
        self.socket.send_json(command)
        
        # 设置超时
        if self.socket.poll(self.timeout):
            response = self.socket.recv_json()
            return response
        else:
            return {"error": "请求超时"}
    except Exception as e:
        return {"error": str(e)}

def create_component(self, comp_type, name=None, **kwargs):
    """请求创建新组件"""
    command = {
        "type": "create_component",
        "component_type": comp_type,
        "name": name or comp_type,
        "parameters": kwargs
    }
    return self.send_command(command)

def execute_component(self, component_id, inputs=None):
    """执行组件计算"""
    command = {
        "type": "run_component",
        "component_id": component_id,
        "inputs": inputs or {}
    }
    return self.send_command(command)

在Grasshopper中使用

if name == “main”:
# 创建客户端
client = GHClient()

# 创建点电池
point_response = client.create_component(
    "Point", 
    name="起点",
    x=0, y=0, z=0
)

# 创建圆电池
circle_response = client.create_component(
    "Circle",
    name="基础圆",
    radius=10,
    plane="WorldXY"
)

# 输出结果
if "error" not in point_response:
    point_data = point_response["component_info"]
    # 在Grasshopper中创建对应的几何体
    a = "点创建成功"

if "error" not in circle_response:
    circle_data = circle_response["component_info"]
    b = "圆创建成功"

方案4:通过GHX/JSON文件格式交换

创建和解析GHX格式(Grasshopper文件格式)

ghx_builder.py

import json
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import List, Dict, Any

class GHXBuilder:
“”“构建Grasshopper .ghx文件”“”

def __init__(self):
    self.root = ET.Element("grasshopper")
    self.root.set("version", "1.0.0.0")
    self.root.set("timestamp", datetime.now().isoformat())
    
    # 创建文档结构
    self.doc = ET.SubElement(self.root, "document")
    self.objects = ET.SubElement(self.doc, "objects")
    self.connections = ET.SubElement(self.doc, "connections")
    
def add_component(self, 
                 component_id: str,
                 component_name: str,
                 nickname: str = None,
                 position: tuple = (0, 0),
                 inputs: List[Dict] = None,
                 outputs: List[Dict] = None):
    """添加组件到GHX"""
    
    comp_elem = ET.SubElement(self.objects, "object")
    comp_elem.set("guid", component_id)
    comp_elem.set("name", component_name)
    comp_elem.set("nickname", nickname or component_name)
    
    # 位置
    location = ET.SubElement(comp_elem, "location")
    location.set("x", str(position[0]))
    location.set("y", str(position[1]))
    
    # 输入参数
    if inputs:
        params_in = ET.SubElement(comp_elem, "params_in")
        for i, input_def in enumerate(inputs):
            param = ET.SubElement(params_in, "param")
            param.set("index", str(i))
            param.set("name", input_def.get("name", f"Input{i}"))
            
            # 设置值
            if "value" in input_def:
                val_elem = ET.SubElement(param, "value")
                val_elem.text = str(input_def["value"])
    
    # 输出参数
    if outputs:
        params_out = ET.SubElement(comp_elem, "params_out")
        for i, output_def in enumerate(outputs):
            param = ET.SubElement(params_out, "param")
            param.set("index", str(i))
            param.set("name", output_def.get("name", f"Output{i}"))

def add_connection(self, 
                  source_id: str, 
                  source_port: int,
                  target_id: str, 
                  target_port: int):
    """添加连接线"""
    
    conn_elem = ET.SubElement(self.connections, "connection")
    
    source = ET.SubElement(conn_elem, "source")
    source.set("guid", source_id)
    source.set("param", str(source_port))
    
    target = ET.SubElement(conn_elem, "target")
    target.set("guid", target_id)
    target.set("param", str(target_port))

def save(self, filepath: str):
    """保存为.ghx文件"""
    tree = ET.ElementTree(self.root)
    
    # 美化XML
    self._indent(self.root)
    
    tree.write(filepath, 
              encoding="utf-8", 
              xml_declaration=True)

def _indent(self, elem, level=0):
    """美化XML输出"""
    indent = "\n" + level * "  "
    if len(elem):
        if not elem.text or not elem.text.strip():
            elem.text = indent + "  "
        if not elem.tail or not elem.tail.strip():
            elem.tail = indent
        for child in elem:
            self._indent(child, level + 1)
        if not child.tail or not child.tail.strip():
            child.tail = indent
    else:
        if level and (not elem.tail or not elem.tail.strip()):
            elem.tail = indent

使用示例

builder = GHXBuilder()

创建点电池

builder.add_component(
component_id=“{11111111-2222-3333-4444-555555555555}”,
component_name=“Point”,
nickname=“起点”,
position=(100, 100),
inputs=[
{“name”: “X”, “value”: 0},
{“name”: “Y”, “value”: 0},
{“name”: “Z”, “value”: 0}
],
outputs=[{“name”: “Point”}]
)

创建圆电池

builder.add_component(
component_id=“{22222222-3333-4444-5555-666666666666}”,
component_name=“Circle”,
nickname=“基础圆”,
position=(300, 100),
inputs=[
{“name”: “Plane”, “value”: “WorldXY”},
{“name”: “Radius”, “value”: 10}
],
outputs=[{“name”: “Circle”}]
)

连接它们

builder.add_connection(
source_id=“{11111111-2222-3333-4444-555555555555}”,
source_port=0, # 点的输出
target_id=“{22222222-3333-4444-5555-666666666666}”,
target_port=0 # 圆的平面输入
)

保存文件

builder.save(“my_circuit.ghx”)

方案5:完整的端到端示例

complete_workflow.py

“”"
完整的OpenCode到Grasshopper工作流示例
包含:设计、生成、导入、执行
“”"

import json
import socket
import threading
import time
from typing import Dict, List, Any
from dataclasses import dataclass, asdict
from enum import Enum

@dataclass
class GHComponent:
“”“Grasshopper组件定义”“”
id: str
name: str
type: str
position: tuple
inputs: List[Dict]
outputs: List[Dict]
nickname: str = “”
description: str = “”

@dataclass
class GHConnection:
“”“Grasshopper连接定义”“”
source_id: str
source_port: int
target_id: str
target_port: int

@dataclass
class GHDefinition:
“”“完整的Grasshopper定义”“”
name: str
components: List[GHComponent]
connections: List[GHConnection]
author: str = “OpenCode”
version: str = “1.0”

class GHRenderer:
“”“Grasshopper定义渲染器”“”

def __init__(self):
    self.components = []
    self.connections = []
    
def add_point(self, x=0, y=0, z=0, nickname="Point"):
    """添加点组件"""
    comp = GHComponent(
        id=f"point_{len(self.components)}",
        name="Point",
        type="Geometry.Point",
        position=(len(self.components) * 150, 100),
        inputs=[
            {"name": "X", "value": x, "type": "number"},
            {"name": "Y", "value": y, "type": "number"},
            {"name": "Z", "value": z, "type": "number"}
        ],
        outputs=[{"name": "Point", "type": "geometry"}],
        nickname=nickname
    )
    self.components.append(comp)
    return comp

def add_circle(self, radius=10, plane="XY", nickname="Circle"):
    """添加圆组件"""
    comp = GHComponent(
        id=f"circle_{len(self.components)}",
        name="Circle",
        type="Geometry.Circle",
        position=(len(self.components) * 150, 200),
        inputs=[
            {"name": "Plane", "value": plane, "type": "plane"},
            {"name": "Radius", "value": radius, "type": "number"}
        ],
        outputs=[{"name": "Circle", "type": "geometry"}],
        nickname=nickname
    )
    self.components.append(comp)
    return comp

def connect(self, source, source_port, target, target_port):
    """连接两个组件"""
    conn = GHConnection(
        source_id=source.id,
        source_port=source_port,
        target_id=target.id,
        target_port=target_port
    )
    self.connections.append(conn)
    return conn

def to_dict(self):
    """转换为字典格式"""
    return {
        "components": [asdict(c) for c in self.components],
        "connections": [asdict(c) for c in self.connections]
    }

def to_ghpython(self):
    """转换为GH Python代码"""
    code_lines = [
        "# 自动生成的Grasshopper Python代码",
        "import Rhino.Geometry as rg",
        "import math",
        "",
        "# 组件定义"
    ]
    
    # 为每个组件生成代码
    for comp in self.components:
        code_lines.append(f"# {comp.nickname} ({comp.type})")
        
        if comp.type == "Geometry.Point":
            x = comp.inputs[0]["value"]
            y = comp.inputs[1]["value"]
            z = comp.inputs[2]["value"]
            code_lines.append(f"{comp.id} = rg.Point3d({x}, {y}, {z})")
        
        elif comp.type == "Geometry.Circle":
            radius = comp.inputs[1]["value"]
            code_lines.append(f"{comp.id} = rg.Circle(rg.Plane.WorldXY, {radius})")
        
        code_lines.append("")
    
    # 根据连接关系组织输出
    code_lines.append("# 输出")
    for i, comp in enumerate(self.components):
        code_lines.append(f"a{i} = {comp.id}")
    
    return "\n".join(code_lines)

def export_ghx(self, filepath: str):
    """导出为.ghx文件"""
    builder = GHXBuilder()
    
    for comp in self.components:
        builder.add_component(
            component_id=comp.id,
            component_name=comp.name,
            nickname=comp.nickname,
            position=comp.position,
            inputs=comp.inputs,
            outputs=comp.outputs
        )
    
    for conn in self.connections:
        builder.add_connection(
            source_id=conn.source_id,
            source_port=conn.source_port,
            target_id=conn.target_id,
            target_port=conn.target_port
        )
    
    builder.save(filepath)

def send_to_gh_live(self, host="localhost", port=12345):
    """实时发送到运行的Grasshopper"""
    data = self.to_dict()
    
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, port))
        s.sendall(json.dumps(data).encode('utf-8'))
        
        # 接收响应
        response = s.recv(1024)
        return json.loads(response.decode('utf-8'))

使用示例

if name == “main”:
# 1. 创建渲染器
renderer = GHRenderer()

# 2. 设计电池网络
point1 = renderer.add_point(0, 0, 0, "起点")
point2 = renderer.add_point(10, 0, 0, "偏移点")
circle1 = renderer.add_circle(5, "XY", "小圆")
circle2 = renderer.add_circle(8, "XY", "大圆")

# 3. 连接电池
renderer.connect(point1, 0, circle1, 0)  # 点 -> 圆的圆心
renderer.connect(point2, 0, circle2, 0)

# 4. 导出
# 方式1: 生成Python代码
python_code = renderer.to_ghpython()
with open("generated_batteries.py", "w") as f:
    f.write(python_code)

# 方式2: 导出GHX文件
renderer.export_ghx("my_design.ghx")

# 方式3: 实时发送
# response = renderer.send_to_gh_live()
# print(f"Grasshopper响应: {response}")

print("电池设计完成!")
print("1. 在Grasshopper中运行 generated_batteries.py")
print("2. 或打开 my_design.ghx 文件")

三、最佳实践建议

  1. 开发环境配置

requirements.txt

OpenCode端依赖

zmq>=24.0.0
websockets>=12.0
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.4.0
pythonnet>=3.0.0
rhino3dm>=8.0.0

  1. 项目结构

opencode-gh-bridge/
├── src/
│ ├── core/ # 核心逻辑
│ │ ├── gh_parser.py
│ │ ├── gh_builder.py
│ │ └── gh_exporter.py
│ ├── protocols/ # 通信协议
│ │ ├── websocket.py
│ │ ├── zmq_bridge.py
│ │ └── http_api.py
│ ├── components/ # 组件库
│ │ ├── geometry.py
│ │ ├── math.py
│ │ └── analysis.py
│ └── ui/ # 用户界面
│ └── web_app.py
├── examples/ # 示例
├── tests/ # 测试
└── docs/ # 文档

  1. 通信协议选择

protocol_selector.py

class ProtocolSelector:
@staticmethod
def select_protocol(requirements):
“”"
根据需求选择通信协议

    需求示例:
    {
        "realtime": True,      # 需要实时
        "bidirectional": True, # 双向通信
        "throughput": "high",  # 吞吐量
        "latency": "low",      # 延迟
        "reliability": "high"  # 可靠性
    }
    """
    
    if requirements["realtime"] and requirements["bidirectional"]:
        if requirements["latency"] == "low":
            return "WebSocket"  # 最推荐
        else:
            return "ZeroMQ"
    
    elif requirements["realtime"] and not requirements["bidirectional"]:
        return "UDP Multicast"
    
    elif not requirements["realtime"]:
        return "REST API + File Export"

四、推荐方案组合

初学者方案

  1. 使用 GH Python电池 + 外部脚本
  2. 通过文件交换 (JSON/GHX)
  3. 逐步学习 Grasshopper SDK

中级开发者方案

  1. WebSocket实时双向通信
  2. GH_CPython插件集成
  3. 自定义GHX导出

高级/生产环境方案

  1. ZeroMQ高性能通信
  2. RhinoCommon SDK深度集成
  3. 分布式计算架构
  4. 容器化部署

五、关键挑战与解决方案

挑战1:Grasshopper组件状态管理

解决方案:状态同步机制

class StateManager:
def init(self):
self.components = {}
self.connections = {}
self.history = []

def sync_with_gh(self, gh_state):
    """与Grasshopper状态同步"""
    # 比较差异
    diffs = self.compare_states(self.components, gh_state)
    
    # 应用变更
    for diff in diffs:
        self.apply_change(diff)
        
    # 记录历史
    self.history.append({
        "timestamp": time.time(),
        "state": self.components.copy()
    })

挑战2:错误处理与恢复

解决方案:容错机制

class FaultTolerantBridge:
def execute_with_retry(self, operation, max_retries=3):
“”“带重试的执行”“”
for attempt in range(max_retries):
try:
return operation()
except ConnectionError as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
self.reconnect()
except Exception as e:
self.log_error(e)
self.rollback_to_last_good_state()
raise

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐