【无标题】
Grasshopper是rhino上非常强大的一款插件,可以用它设计非常复杂的参数化晶格体建模等。
我期望能用AI驱动此插件的应用。
详细分析几种用OpenCode与Grasshopper建立通信和电池编程的方法。
一、技术方案总览
方案 通信方向 实时性 复杂度 适用场景
-
Grasshopper Python电池 GH → OpenCode ★★★★☆ ★★☆☆☆ 内部脚本执行
-
RhinoCommon API OpenCode → Rhino ★★★★☆ ★★★☆☆ 外部控制GH
-
Grasshopper SDK OpenCode → GH电池 ★★★★☆ ★★★★★ 专业插件开发
-
GH_CPython电池 双向通信 ★★★★★ ★★★★☆ 实时交互
-
JSON序列化 文件交换 ★★☆☆☆ ★☆☆☆☆ 批处理/离线
-
自定义二进制协议 双向通信 ★★★★★ ★★★★★ 高性能需求
二、详细实施方案
方案1:通过GH Python电池(最简单)
1. 创建可执行外部代码的Python电池
gh_python_bridge.ghpython
“”“在Grasshopper中创建此Python电池”“”
import sys
import json
import subprocess
import tempfile
import clr
clr.AddReference(“Grasshopper”)
import Grasshopper as gh
import Rhino.Geometry as rg
接收外部代码
external_code = x # 从Grasshopper输入端获取
def execute_external_code(code_string):
“”“执行外部Python代码并返回结果”“”
# 创建临时文件
with tempfile.NamedTemporaryFile(mode=‘w’, suffix=‘.py’, delete=False) as f:
f.write(code_string)
temp_file = f.name
try:
# 执行代码
result = subprocess.run(
[sys.executable, temp_file],
capture_output=True,
text=True,
timeout=10
)
# 解析输出
if result.returncode == 0:
output = result.stdout.strip()
# 可以解析为Grasshopper几何体
return parse_output_to_geometry(output)
else:
return f"Error: {result.stderr}"
finally:
import os
os.unlink(temp_file)
def parse_output_to_geometry(output_str):
“”“将输出字符串转换为Grasshopper几何体”“”
try:
data = json.loads(output_str)
# 根据数据类型创建几何体
if data[‘type’] == ‘point’:
return rg.Point3d(*data[‘coordinates’])
elif data[‘type’] == ‘curve’:
points = [rg.Point3d(*p) for p in data[‘points’]]
return rg.Curve.CreateControlPointCurve(points, 3)
# … 其他类型
except:
return output_str # 返回原始字符串
执行外部代码
if external_code:
a = execute_external_code(str(external_code))
方案2:通过RhinoCommon API创建电池
// 使用RhinoCommon在OpenCode中创建电池
// 需要在Visual Studio中创建类库项目
using System;
using System.Collections.Generic;
using Rhino;
using Rhino.Geometry;
using Grasshopper;
using Grasshopper.Kernel;
using Grasshopper.Kernel.Types;
using Grasshopper.Kernel.Parameters;
namespace OpenCodeToGH
{
public class BatteryCreator
{
public void CreateAndConnectBatteries()
{
// 获取当前Grasshopper文档
var ghDoc = Instances.ActiveCanvas.Document;
// 1. 创建点电池
var pointComponent = CreateComponent(ghDoc, "Point", new Point3d(0, 0, 0));
// 2. 创建圆电池
var circleComponent = CreateComponent(ghDoc, "Circle", 10.0);
// 3. 连接电池
ConnectComponents(
pointComponent, 0, // 点电池的输出端口0
circleComponent, 0 // 圆电池的输入端口0
);
// 4. 设置参数
SetComponentNickname(circleComponent, "My Circle");
// 5. 触发重新计算
ghDoc.NewSolution(true);
}
private IGH_DocumentObject CreateComponent(
GH_Document doc,
string componentName,
object defaultValue = null)
{
// 创建组件实例
var obj = Grasshopper.Instances.ComponentServer.EmitObject(componentName);
if (obj != null)
{
// 添加到文档
doc.AddObject(obj, true);
// 设置位置
obj.Attributes.Pivot = new PointF(
100 + (doc.ObjectCount * 50),
100
);
// 设置默认值
if (defaultValue != null)
{
SetComponentValue(obj, defaultValue);
}
}
return obj;
}
private void ConnectComponents(
IGH_DocumentObject source, int sourcePort,
IGH_DocumentObject target, int targetPort)
{
var sourceParam = source.Params.Output[sourcePort];
var targetParam = target.Params.Input[targetPort];
// 创建连接
var connection = new GH_Connection();
connection.Add(sourceParam, targetParam);
// 添加到文档
Instances.ActiveCanvas.Document.Connections.Add(connection);
}
}
}
方案3:通过GH_CPython实现实时通信
使用GH_CPython插件实现双向实时通信
https://www.food4rhino.com/en/app/ghcpython
“”"
安装步骤:
- 下载GH_CPython插件
- 安装到Grasshopper
- 配置ZeroMQ或WebSocket
“”"
server.py - OpenCode端
import zmq
import json
from datetime import datetime
class GrasshopperBridge:
def init(self, port=5555):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind(f"tcp://*:{port}")
self.gh_components = {}
def listen(self):
"""监听Grasshopper请求"""
print("等待Grasshopper连接...")
while True:
# 接收消息
message = self.socket.recv_json()
print(f"收到: {message}")
# 处理指令
response = self.process_command(message)
# 发送响应
self.socket.send_json(response)
def process_command(self, command):
"""处理来自Grasshopper的指令"""
cmd_type = command.get("type")
if cmd_type == "create_component":
return self.create_component(command)
elif cmd_type == "set_parameter":
return self.set_parameter(command)
elif cmd_type == "run_component":
return self.run_component(command)
elif cmd_type == "get_value":
return self.get_value(command)
else:
return {"error": f"未知指令: {cmd_type}"}
def create_component(self, cmd):
"""创建新电池的指令"""
component_id = f"comp_{datetime.now().timestamp()}"
component_info = {
"id": component_id,
"name": cmd.get("name", "未命名"),
"type": cmd.get("component_type"),
"position": cmd.get("position", [0, 0]),
"parameters": cmd.get("parameters", {})
}
self.gh_components[component_id] = component_info
return {
"status": "created",
"component_id": component_id,
"component_info": component_info
}
def generate_gh_python_code(self, component_info):
"""生成对应GH Python电池的代码"""
code_template = """
import Rhino.Geometry as rg
import Grasshopper as gh
自动生成的电池代码
{component_code}
输出结果
a = result
“”"
# 根据组件类型生成不同代码
comp_type = component_info["type"]
if comp_type == "Point":
x = component_info["parameters"].get("x", 0)
y = component_info["parameters"].get("y", 0)
z = component_info["parameters"].get("z", 0)
code = f"result = rg.Point3d({x}, {y}, {z})"
elif comp_type == "Circle":
radius = component_info["parameters"].get("radius", 10)
plane = component_info["parameters"].get("plane", "WorldXY")
code = f"""
plane = rg.Plane.{plane}
result = rg.Circle(plane, {radius})
“”"
# 更多组件类型...
return code_template.format(component_code=code)
client.py - Grasshopper端 (GH_CPython电池)
“”"
在Grasshopper中创建GH_CPython电池,填入以下代码:
“”"
import zmq
import json
class GHClient:
def init(self, server_address=“tcp://localhost:5555”):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(server_address)
self.timeout = 5000 # 5秒超时
def send_command(self, command):
"""发送命令到OpenCode服务器"""
try:
self.socket.send_json(command)
# 设置超时
if self.socket.poll(self.timeout):
response = self.socket.recv_json()
return response
else:
return {"error": "请求超时"}
except Exception as e:
return {"error": str(e)}
def create_component(self, comp_type, name=None, **kwargs):
"""请求创建新组件"""
command = {
"type": "create_component",
"component_type": comp_type,
"name": name or comp_type,
"parameters": kwargs
}
return self.send_command(command)
def execute_component(self, component_id, inputs=None):
"""执行组件计算"""
command = {
"type": "run_component",
"component_id": component_id,
"inputs": inputs or {}
}
return self.send_command(command)
在Grasshopper中使用
if name == “main”:
# 创建客户端
client = GHClient()
# 创建点电池
point_response = client.create_component(
"Point",
name="起点",
x=0, y=0, z=0
)
# 创建圆电池
circle_response = client.create_component(
"Circle",
name="基础圆",
radius=10,
plane="WorldXY"
)
# 输出结果
if "error" not in point_response:
point_data = point_response["component_info"]
# 在Grasshopper中创建对应的几何体
a = "点创建成功"
if "error" not in circle_response:
circle_data = circle_response["component_info"]
b = "圆创建成功"
方案4:通过GHX/JSON文件格式交换
创建和解析GHX格式(Grasshopper文件格式)
ghx_builder.py
import json
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import List, Dict, Any
class GHXBuilder:
“”“构建Grasshopper .ghx文件”“”
def __init__(self):
self.root = ET.Element("grasshopper")
self.root.set("version", "1.0.0.0")
self.root.set("timestamp", datetime.now().isoformat())
# 创建文档结构
self.doc = ET.SubElement(self.root, "document")
self.objects = ET.SubElement(self.doc, "objects")
self.connections = ET.SubElement(self.doc, "connections")
def add_component(self,
component_id: str,
component_name: str,
nickname: str = None,
position: tuple = (0, 0),
inputs: List[Dict] = None,
outputs: List[Dict] = None):
"""添加组件到GHX"""
comp_elem = ET.SubElement(self.objects, "object")
comp_elem.set("guid", component_id)
comp_elem.set("name", component_name)
comp_elem.set("nickname", nickname or component_name)
# 位置
location = ET.SubElement(comp_elem, "location")
location.set("x", str(position[0]))
location.set("y", str(position[1]))
# 输入参数
if inputs:
params_in = ET.SubElement(comp_elem, "params_in")
for i, input_def in enumerate(inputs):
param = ET.SubElement(params_in, "param")
param.set("index", str(i))
param.set("name", input_def.get("name", f"Input{i}"))
# 设置值
if "value" in input_def:
val_elem = ET.SubElement(param, "value")
val_elem.text = str(input_def["value"])
# 输出参数
if outputs:
params_out = ET.SubElement(comp_elem, "params_out")
for i, output_def in enumerate(outputs):
param = ET.SubElement(params_out, "param")
param.set("index", str(i))
param.set("name", output_def.get("name", f"Output{i}"))
def add_connection(self,
source_id: str,
source_port: int,
target_id: str,
target_port: int):
"""添加连接线"""
conn_elem = ET.SubElement(self.connections, "connection")
source = ET.SubElement(conn_elem, "source")
source.set("guid", source_id)
source.set("param", str(source_port))
target = ET.SubElement(conn_elem, "target")
target.set("guid", target_id)
target.set("param", str(target_port))
def save(self, filepath: str):
"""保存为.ghx文件"""
tree = ET.ElementTree(self.root)
# 美化XML
self._indent(self.root)
tree.write(filepath,
encoding="utf-8",
xml_declaration=True)
def _indent(self, elem, level=0):
"""美化XML输出"""
indent = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = indent + " "
if not elem.tail or not elem.tail.strip():
elem.tail = indent
for child in elem:
self._indent(child, level + 1)
if not child.tail or not child.tail.strip():
child.tail = indent
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = indent
使用示例
builder = GHXBuilder()
创建点电池
builder.add_component(
component_id=“{11111111-2222-3333-4444-555555555555}”,
component_name=“Point”,
nickname=“起点”,
position=(100, 100),
inputs=[
{“name”: “X”, “value”: 0},
{“name”: “Y”, “value”: 0},
{“name”: “Z”, “value”: 0}
],
outputs=[{“name”: “Point”}]
)
创建圆电池
builder.add_component(
component_id=“{22222222-3333-4444-5555-666666666666}”,
component_name=“Circle”,
nickname=“基础圆”,
position=(300, 100),
inputs=[
{“name”: “Plane”, “value”: “WorldXY”},
{“name”: “Radius”, “value”: 10}
],
outputs=[{“name”: “Circle”}]
)
连接它们
builder.add_connection(
source_id=“{11111111-2222-3333-4444-555555555555}”,
source_port=0, # 点的输出
target_id=“{22222222-3333-4444-5555-666666666666}”,
target_port=0 # 圆的平面输入
)
保存文件
builder.save(“my_circuit.ghx”)
方案5:完整的端到端示例
complete_workflow.py
“”"
完整的OpenCode到Grasshopper工作流示例
包含:设计、生成、导入、执行
“”"
import json
import socket
import threading
import time
from typing import Dict, List, Any
from dataclasses import dataclass, asdict
from enum import Enum
@dataclass
class GHComponent:
“”“Grasshopper组件定义”“”
id: str
name: str
type: str
position: tuple
inputs: List[Dict]
outputs: List[Dict]
nickname: str = “”
description: str = “”
@dataclass
class GHConnection:
“”“Grasshopper连接定义”“”
source_id: str
source_port: int
target_id: str
target_port: int
@dataclass
class GHDefinition:
“”“完整的Grasshopper定义”“”
name: str
components: List[GHComponent]
connections: List[GHConnection]
author: str = “OpenCode”
version: str = “1.0”
class GHRenderer:
“”“Grasshopper定义渲染器”“”
def __init__(self):
self.components = []
self.connections = []
def add_point(self, x=0, y=0, z=0, nickname="Point"):
"""添加点组件"""
comp = GHComponent(
id=f"point_{len(self.components)}",
name="Point",
type="Geometry.Point",
position=(len(self.components) * 150, 100),
inputs=[
{"name": "X", "value": x, "type": "number"},
{"name": "Y", "value": y, "type": "number"},
{"name": "Z", "value": z, "type": "number"}
],
outputs=[{"name": "Point", "type": "geometry"}],
nickname=nickname
)
self.components.append(comp)
return comp
def add_circle(self, radius=10, plane="XY", nickname="Circle"):
"""添加圆组件"""
comp = GHComponent(
id=f"circle_{len(self.components)}",
name="Circle",
type="Geometry.Circle",
position=(len(self.components) * 150, 200),
inputs=[
{"name": "Plane", "value": plane, "type": "plane"},
{"name": "Radius", "value": radius, "type": "number"}
],
outputs=[{"name": "Circle", "type": "geometry"}],
nickname=nickname
)
self.components.append(comp)
return comp
def connect(self, source, source_port, target, target_port):
"""连接两个组件"""
conn = GHConnection(
source_id=source.id,
source_port=source_port,
target_id=target.id,
target_port=target_port
)
self.connections.append(conn)
return conn
def to_dict(self):
"""转换为字典格式"""
return {
"components": [asdict(c) for c in self.components],
"connections": [asdict(c) for c in self.connections]
}
def to_ghpython(self):
"""转换为GH Python代码"""
code_lines = [
"# 自动生成的Grasshopper Python代码",
"import Rhino.Geometry as rg",
"import math",
"",
"# 组件定义"
]
# 为每个组件生成代码
for comp in self.components:
code_lines.append(f"# {comp.nickname} ({comp.type})")
if comp.type == "Geometry.Point":
x = comp.inputs[0]["value"]
y = comp.inputs[1]["value"]
z = comp.inputs[2]["value"]
code_lines.append(f"{comp.id} = rg.Point3d({x}, {y}, {z})")
elif comp.type == "Geometry.Circle":
radius = comp.inputs[1]["value"]
code_lines.append(f"{comp.id} = rg.Circle(rg.Plane.WorldXY, {radius})")
code_lines.append("")
# 根据连接关系组织输出
code_lines.append("# 输出")
for i, comp in enumerate(self.components):
code_lines.append(f"a{i} = {comp.id}")
return "\n".join(code_lines)
def export_ghx(self, filepath: str):
"""导出为.ghx文件"""
builder = GHXBuilder()
for comp in self.components:
builder.add_component(
component_id=comp.id,
component_name=comp.name,
nickname=comp.nickname,
position=comp.position,
inputs=comp.inputs,
outputs=comp.outputs
)
for conn in self.connections:
builder.add_connection(
source_id=conn.source_id,
source_port=conn.source_port,
target_id=conn.target_id,
target_port=conn.target_port
)
builder.save(filepath)
def send_to_gh_live(self, host="localhost", port=12345):
"""实时发送到运行的Grasshopper"""
data = self.to_dict()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(json.dumps(data).encode('utf-8'))
# 接收响应
response = s.recv(1024)
return json.loads(response.decode('utf-8'))
使用示例
if name == “main”:
# 1. 创建渲染器
renderer = GHRenderer()
# 2. 设计电池网络
point1 = renderer.add_point(0, 0, 0, "起点")
point2 = renderer.add_point(10, 0, 0, "偏移点")
circle1 = renderer.add_circle(5, "XY", "小圆")
circle2 = renderer.add_circle(8, "XY", "大圆")
# 3. 连接电池
renderer.connect(point1, 0, circle1, 0) # 点 -> 圆的圆心
renderer.connect(point2, 0, circle2, 0)
# 4. 导出
# 方式1: 生成Python代码
python_code = renderer.to_ghpython()
with open("generated_batteries.py", "w") as f:
f.write(python_code)
# 方式2: 导出GHX文件
renderer.export_ghx("my_design.ghx")
# 方式3: 实时发送
# response = renderer.send_to_gh_live()
# print(f"Grasshopper响应: {response}")
print("电池设计完成!")
print("1. 在Grasshopper中运行 generated_batteries.py")
print("2. 或打开 my_design.ghx 文件")
三、最佳实践建议
- 开发环境配置
requirements.txt
OpenCode端依赖
zmq>=24.0.0
websockets>=12.0
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.4.0
pythonnet>=3.0.0
rhino3dm>=8.0.0
- 项目结构
opencode-gh-bridge/
├── src/
│ ├── core/ # 核心逻辑
│ │ ├── gh_parser.py
│ │ ├── gh_builder.py
│ │ └── gh_exporter.py
│ ├── protocols/ # 通信协议
│ │ ├── websocket.py
│ │ ├── zmq_bridge.py
│ │ └── http_api.py
│ ├── components/ # 组件库
│ │ ├── geometry.py
│ │ ├── math.py
│ │ └── analysis.py
│ └── ui/ # 用户界面
│ └── web_app.py
├── examples/ # 示例
├── tests/ # 测试
└── docs/ # 文档
- 通信协议选择
protocol_selector.py
class ProtocolSelector:
@staticmethod
def select_protocol(requirements):
“”"
根据需求选择通信协议
需求示例:
{
"realtime": True, # 需要实时
"bidirectional": True, # 双向通信
"throughput": "high", # 吞吐量
"latency": "low", # 延迟
"reliability": "high" # 可靠性
}
"""
if requirements["realtime"] and requirements["bidirectional"]:
if requirements["latency"] == "low":
return "WebSocket" # 最推荐
else:
return "ZeroMQ"
elif requirements["realtime"] and not requirements["bidirectional"]:
return "UDP Multicast"
elif not requirements["realtime"]:
return "REST API + File Export"
四、推荐方案组合
初学者方案
- 使用 GH Python电池 + 外部脚本
- 通过文件交换 (JSON/GHX)
- 逐步学习 Grasshopper SDK
中级开发者方案
- WebSocket实时双向通信
- GH_CPython插件集成
- 自定义GHX导出
高级/生产环境方案
- ZeroMQ高性能通信
- RhinoCommon SDK深度集成
- 分布式计算架构
- 容器化部署
五、关键挑战与解决方案
挑战1:Grasshopper组件状态管理
解决方案:状态同步机制
class StateManager:
def init(self):
self.components = {}
self.connections = {}
self.history = []
def sync_with_gh(self, gh_state):
"""与Grasshopper状态同步"""
# 比较差异
diffs = self.compare_states(self.components, gh_state)
# 应用变更
for diff in diffs:
self.apply_change(diff)
# 记录历史
self.history.append({
"timestamp": time.time(),
"state": self.components.copy()
})
挑战2:错误处理与恢复
解决方案:容错机制
class FaultTolerantBridge:
def execute_with_retry(self, operation, max_retries=3):
“”“带重试的执行”“”
for attempt in range(max_retries):
try:
return operation()
except ConnectionError as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
self.reconnect()
except Exception as e:
self.log_error(e)
self.rollback_to_last_good_state()
raise
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)