最近花时间读了港大 HKUDS 实验室开源的 nanobot 项目的核心源码,把 Agent 的运行原理从入口到执行链路完整捋了一遍。这篇文章不是官方文档的翻译,是我自己读代码时的理解和记录,尽量把关键设计讲清楚。

项目背景

nanobot 是一个用 Python 写的超轻量个人 AI 助手框架,GitHub 上 35k+ star。它不是一个模型,而是一个 Agent 框架——通过 LLM + 工具调用 + 消息总线,让大模型可以执行真实操作:跑 shell 命令、读写文件、上网搜索、定时任务、多平台收发消息等。

跟 LangChain 那种大而全的框架不同,nanobot 的核心代码量刻意控制得很小,每个模块就一两个文件,很适合用来理解"一个能用的 Agent 到底需要哪些东西"。

技术栈:Python 3.11+,全 asyncio 异步,Typer CLI,Pydantic 配置,httpx 网络请求。

从入口开始:它是怎么启动的

pyproject.toml 里定义了命令行入口:

[project.scripts]
nanobot = "nanobot.cli.commands:app"

是一个基于 Typer 的 CLI 应用,日常有两种启动方式:

  • nanobot chat:命令行交互模式,在终端里直接对话
  • nanobot run:网关模式,启动后台服务,同时接入多个平台(Telegram、飞书、钉钉等)

不管哪种,本质上都是创建一个 MessageBus 和一个 AgentLoop,然后跑起来。差异在于 chat 模式下"Channel"就是你的终端,run 模式下会通过 ChannelManager 初始化配置文件里 enabled 的所有平台 Channel。

整体架构:一条消息是怎么流转的

先看全局,后面再逐个拆:

用户消息(Telegram/飞书/终端/...)
        │
        ▼
┌─────────────────────────────────────────────────┐
│  Channel 层(Telegram / Discord / 飞书 / CLI ...)│
│  每个平台实现 BaseChannel 接口                    │
└───────────────────┬─────────────────────────────┘
                    │  封装成 InboundMessage
                    ▼
            ┌──────────────┐
            │  MessageBus   │  ← 两个 asyncio.Queue
            │  inbound      │     生产者-消费者模型
            │  outbound     │
            └──────┬───────┘
                   │
                   ▼
┌──────────────────────────────────────────────────┐
│              AgentLoop (核心引擎)                  │
│                                                  │
│  1. 从 bus 取 InboundMessage                      │
│  2. SessionManager 找到/创建对话 session           │
│  3. ContextBuilder 组装 prompt                    │
│     (system prompt + 记忆 + 技能 + 历史 + 当前消息)│
│  4. 调 LLM Provider                              │
│  5. LLM 返回 tool_calls → ToolRegistry 执行      │
│     → 结果放回 messages → 回到第 4 步              │
│  6. LLM 返回文本 → 发 OutboundMessage             │
│  7. 持久化 session,检查是否需要记忆整理            │
└──────────────────────────────────────────────────┘
                   │
                   ▼  OutboundMessage
            ┌──────────────┐
            │  MessageBus   │
            └──────┬───────┘
                   │
                   ▼
         Channel.send() → 发回用户

下面按模块来看源码。

MessageBus:两个 Queue 撑起的消息总线

文件:nanobot/bus/queue.py

class MessageBus:
    def __init__(self):
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()

    async def publish_inbound(self, msg: InboundMessage) -> None:
        await self.inbound.put(msg)

    async def consume_inbound(self) -> InboundMessage:
        return await self.inbound.get()

    async def publish_outbound(self, msg: OutboundMessage) -> None:
        await self.outbound.put(msg)

    async def consume_outbound(self) -> OutboundMessage:
        return await self.outbound.get()

整个文件就这么点东西。两个 asyncio.Queue,一进一出。

但这个设计是整个系统的骨架——它把 Channel 层和 Agent 核心完全解耦了。Channel 只管往 inbound 队列扔消息,从 outbound 队列取回复,完全不需要知道消息是怎么被处理的。反过来,Agent 也不关心消息是从 Telegram 来的还是终端来的。

消息的数据结构也很清晰:

@dataclass
class InboundMessage:
    channel: str        # telegram / discord / cli ...
    sender_id: str      # 用户标识
    chat_id: str        # 对话标识
    content: str        # 消息文本
    media: list[str]    # 附件 URL
    metadata: dict      # 平台特有数据

    @property
    def session_key(self) -> str:
        return f"{self.channel}:{self.chat_id}"

session_key 这个属性用 channel:chat_id 做 key,后面 Session 管理就靠这个来区分不同的对话。

AgentLoop:核心的 ReAct 循环

文件:nanobot/agent/loop.py

这是整个项目的心脏。run() 方法是一个死循环,不停地从 Bus 取消息处理:

async def run(self):
    self._running = True
    while self._running:
        try:
            msg = await asyncio.wait_for(
                self.bus.consume_inbound(), timeout=1.0
            )
        except asyncio.TimeoutError:
            continue

        session = self.sessions.get_or_create(msg.session_key)
        await self.memory_consolidator.maybe_consolidate_by_tokens(session)

        history = session.get_history(max_messages=0)
        messages = self.context.build_messages(
            history=history,
            current_message=msg.content,
            channel=msg.channel,
            chat_id=msg.chat_id,
        )

        final_content, _, all_msgs = await self._run_agent_loop(messages)

每条消息的处理步骤:

  1. 根据 session_key 拿到或创建会话
  2. 检查是否需要做记忆整理(后面详细说)
  3. ContextBuilder 组装完整的 prompt
  4. 进入 Agent 循环

_run_agent_loop() 是核心中的核心,实现了经典的 ReAct(Reasoning + Acting) 模式:

async def _run_agent_loop(self, initial_messages, ...):
    messages = initial_messages
    iteration = 0
    final_content = None

    while iteration < max_iterations:
        iteration += 1
        response = await self.provider.chat_with_retry(
            messages=messages,
            tools=self.tools.get_definitions(),
            model=self.model,
        )

        if response.has_tool_calls:
            # LLM 选择调用工具
            for tool_call in response.tool_calls:
                result = await self.tools.execute(
                    tool_call.name, tool_call.arguments
                )
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result,
                })
            continue  # 把工具结果给 LLM 继续思考

        else:
            # LLM 直接输出了文本,结束循环
            final_content = response.content
            break

    return final_content, tools_used, messages

逻辑很直白:LLM 要么选择调工具(搜索、执行命令、读写文件),拿到结果后继续思考;要么直接给出最终回复。有 max_iterations 限制防止死循环。

一些值得注意的细节:

  • 流式输出:支持 streaming,LLM 边生成边推送 delta 给 Channel。如果生成到一半要调工具,会通知前端 on_stream_end(resuming=True),意思是"先别关进度条,我去干个活马上回来"
  • think 标签过滤:有些模型(DeepSeek-R1 等)输出里会夹带 <think>...</think> 内容,_strip_think() 在流式输出过程中实时过滤掉
  • tool hint:执行工具时会生成简洁的提示(如 web_search("北京天气"))通过 progress 机制告知用户当前在干什么

ContextBuilder:分层组装的 Prompt 工程

文件:nanobot/agent/context.py

负责把发给 LLM 的完整 prompt 组装好。system prompt 的构建是分层叠加的:

def build_system_prompt(self, skill_names=None) -> str:
    parts = [self._get_identity()]        # 1. 身份 + 平台信息 + 工具使用规范

    bootstrap = self._load_bootstrap_files()  # 2. 读 AGENTS.md / SOUL.md / USER.md / TOOLS.md
    if bootstrap:
        parts.append(bootstrap)

    memory = self.memory.get_memory_context()  # 3. 长期记忆 (MEMORY.md)
    if memory:
        parts.append(f"# Memory\n\n{memory}")

    always_skills = self.skills.get_always_skills()  # 4. always=true 的技能完整内容
    if always_skills:
        content = self.skills.load_skills_for_context(always_skills)
        parts.append(f"# Active Skills\n\n{content}")

    skills_summary = self.skills.build_skills_summary()  # 5. 所有技能的摘要
    if skills_summary:
        parts.append(f"# Skills\n{skills_summary}")

    return "\n\n---\n\n".join(parts)

build_messages() 最终组装:

def build_messages(self, history, current_message, channel, chat_id, ...):
    runtime_ctx = self._build_runtime_context(channel, chat_id)
    # 运行时上下文(时间、channel 信息)和用户消息合并到同一个 user message
    # 因为有些 LLM provider 拒绝连续的同 role 消息
    merged = f"{runtime_ctx}\n\n{current_message}"

    return [
        {"role": "system", "content": system_prompt},
        *history,
        {"role": "user", "content": merged},
    ]

这里几个设计选择值得说:

  • Bootstrap Files 是用户自定义的。你可以在工作目录放一个 SOUL.md 定义 Agent 人设,放一个 USER.md 告诉它你的偏好。Agent 每次对话都会读这些文件
  • Skills 渐进加载:prompt 里只放技能的摘要列表(名称 + 描述 + 文件路径),Agent 需要某个技能时自己用 read_file 工具去读完整内容。目的是省 token
  • Runtime Context 合并:当前时间、Channel 信息等元数据跟用户消息合并成一条 user message,而不是单独一条。注释解释了原因:部分 provider 不接受连续的同 role 消息

Memory 系统:用 LLM 自己给自己做笔记

文件:nanobot/agent/memory.py

这部分我觉得是整个项目里设计得比较巧的地方。

两层存储

  • MEMORY.md:长期记忆,存浓缩后的关键事实。每次 LLM 调用时带在 system prompt 里
  • HISTORY.md:完整的交互日志,可以 grep 搜索

记忆整理机制

当 session 的消息 token 数超过阈值时,MemoryConsolidator 会触发整理。整理的方式是——再调一次 LLM

# 构造一个专门用于记忆整理的 prompt
prompt = f"""Current Memory:
{existing_memory}

Recent Conversation:
{self._format_messages(messages)}"""

chat_messages = [
    {"role": "system", "content": "You are a memory consolidation agent."},
    {"role": "user", "content": prompt},
]

# 强制 LLM 必须调用 save_memory 工具
forced = {"type": "function", "function": {"name": "save_memory"}}
response = await provider.chat_with_retry(
    messages=chat_messages,
    tools=_SAVE_MEMORY_TOOL,
    tool_choice=forced,
)

save_memory 工具的定义:

_SAVE_MEMORY_TOOL = [{
    "type": "function",
    "function": {
        "name": "save_memory",
        "parameters": {
            "type": "object",
            "properties": {
                "history_entry": {
                    "type": "string",
                    "description": "对话的摘要记录,追加到 HISTORY.md"
                },
                "memory_update": {
                    "type": "string",
                    "description": "更新后的长期记忆,覆盖写入 MEMORY.md"
                },
            },
            "required": ["history_entry", "memory_update"]
        }
    }
}]

通过 tool_choice=forced 强制 LLM 用固定的工具格式返回,确保输出结构可解析。

失败兜底

不是所有模型都支持 forced tool_choice(比如一些小众 provider),所以代码里有两层降级:

  1. 先用 forced,如果报错包含 tool_choice 关键字,自动降级到 tool_choice="auto" 重试
  2. 如果连续失败 3 次(_MAX_FAILURES_BEFORE_RAW_ARCHIVE = 3),直接把原始消息格式化存进 HISTORY.md
def _fail_or_raw_archive(self, messages):
    self._consecutive_failures += 1
    if self._consecutive_failures >= self._MAX_FAILURES_BEFORE_RAW_ARCHIVE:
        # 直接存原始消息,保证数据不丢
        ...

这个兜底设计挺实际的,不会因为记忆整理失败就丢数据。

工具系统:标准的模板方法模式

文件:nanobot/agent/tools/base.pynanobot/agent/tools/registry.py

Tool 基类

class Tool(ABC):
    @property
    @abstractmethod
    def name(self) -> str: ...

    @property
    @abstractmethod
    def description(self) -> str: ...

    @property
    @abstractmethod
    def parameters(self) -> dict[str, Any]: ...

    @abstractmethod
    async def execute(self, **kwargs) -> Any: ...

    def to_schema(self) -> dict:
        """自动生成 OpenAI function calling 格式"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters,
            },
        }

每个具体工具实现这四个东西就行。内置工具包括:

工具 功能
exec 执行 shell 命令
read_file / write_file / edit_file 文件读写编辑
list_dir 列出目录
web_search 网络搜索
web_fetch 抓取网页内容
message 向指定 Channel 发消息
spawn 创建后台子 Agent
cron 系列 定时任务管理

ToolRegistry

class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, Tool] = {}

    def register(self, tool: Tool) -> None:
        self._tools[tool.name] = tool

    def get_definitions(self) -> list[dict]:
        """生成所有工具的 OpenAI function calling 定义"""
        return [tool.to_schema() for tool in self._tools.values()]

    async def execute(self, name: str, params: dict) -> Any:
        tool = self._tools.get(name)
        params = tool.cast_params(params)     # 类型转换
        errors = tool.validate_params(params)  # 参数校验
        if errors:
            return f"Error: ..." + _HINT
        result = await tool.execute(**params)
        return result

两个细节值得注意:

  1. cast_params():LLM 经常把数字传成字符串(比如 "5" 而不是 5),这个方法根据 JSON Schema 做自动类型转换,提高了 Agent 的鲁棒性
  2. 错误提示追加 _HINT:执行出错时在末尾加一句 [Analyze the error above and try a different approach.],引导 LLM 换个方式重试而不是用同样的参数反复调用

SubagentManager:子 Agent 异步执行

文件:nanobot/agent/subagent.py

主 Agent 可以通过 spawn 工具把耗时任务派给子 Agent 在后台执行:

class SpawnTool(Tool):
    @property
    def name(self) -> str:
        return "spawn"

    async def execute(self, task: str, label: str | None = None):
        return await self._manager.spawn(task=task, ...)

子 Agent 的运行机制:

  1. 在独立的 asyncio.Task 里运行,有自己的工具集(文件操作、Shell、网络搜索——但不能再 spawn,防止套娃)
  2. 也是一个 ReAct 循环,迭代上限 15 次(比主 Agent 低)
  3. 有独立的 system prompt,专注于被分配的任务

完成后的结果传递方式很有意思:

# 子 Agent 完成后,把结果包装成 InboundMessage 发回 Bus
msg = InboundMessage(
    channel="system",
    sender_id="subagent",
    chat_id=f"{origin['channel']}:{origin['chat_id']}",
    content=announce_content,  # 包含任务描述和结果
)
await self.bus.publish_inbound(msg)

主 Agent 收到这条 sender_id="subagent" 的系统消息后,会用 LLM 把技术结果翻译成自然语言再发给用户。这样用户看到的回复始终是统一的语气风格。

Session 管理:Append-Only 的 JSONL 文件

文件:nanobot/session/manager.py

@dataclass
class Session:
    """Messages are append-only for LLM cache efficiency."""
    key: str                                    # channel:chat_id
    messages: list[dict] = field(default_factory=list)
    last_consolidated: int = 0                  # 上次整理到第几条
    created_at: datetime = field(default_factory=datetime.now)

持久化格式是 JSONL(一行一条消息,第一行是 metadata),用 channel:chat_id 做文件名。

这里有一个刻意的设计:消息列表只追加、不修改。记忆整理只写 MEMORY.md 和 HISTORY.md,不动 session 里的消息。源码注释解释了原因——如果改了历史消息,LLM provider 侧的 KV cache 就失效了,要重新计算整个上下文,浪费钱也浪费时间。

SessionManager 维护了一个内存缓存(dict),命中缓存直接返回,否则从 JSONL 文件加载。还兼容了 legacy 路径迁移(老版本的 session 存在全局目录,新版存在 workspace 下)。

Channel 层:插件化的平台接入

文件:nanobot/channels/base.pynanobot/channels/manager.py

BaseChannel 抽象基类

class BaseChannel(ABC):
    name: str = "base"

    def __init__(self, config, bus: MessageBus):
        self.config = config
        self.bus = bus

    @abstractmethod
    async def start(self) -> None: ...    # 监听平台消息

    @abstractmethod
    async def stop(self) -> None: ...     # 停止

    @abstractmethod
    async def send(self, msg: OutboundMessage) -> None: ...  # 发消息

    async def _handle_message(self, sender_id, chat_id, content, ...):
        """消息进来后的统一处理"""
        if not self.is_allowed(sender_id):  # 白名单鉴权
            return
        msg = InboundMessage(channel=self.name, ...)
        await self.bus.publish_inbound(msg)

每个平台一个文件,目前支持:Telegram、Discord、Slack、飞书、钉钉、企业微信、QQ、WhatsApp、Email、Matrix 等十几个。

ChannelManager

class ChannelManager:
    def _init_channels(self):
        # 通过 registry 发现所有 Channel 类(包括第三方插件)
        for name, cls in discover_all().items():
            section = getattr(self.config.channels, name, None)
            if section and section.enabled:
                channel = cls(section, self.bus)
                self.channels[name] = channel

启动时通过 discover_all() 做服务发现,支持两种注册方式:内置的 Channel 通过 pkgutil 扫描发现,第三方 Channel 通过 Python entry_points 插件机制注册。

出方向有一个专门的 _dispatch_outbound() 任务,死循环从 outbound queue 取消息,按 msg.channel 字段路由到对应的 Channel 实例发出去。

白名单鉴权

def is_allowed(self, sender_id: str) -> bool:
    allow_list = getattr(self.config, "allow_from", [])
    if not allow_list:
        return False          # 空列表 = 拒绝所有
    if "*" in allow_list:
        return True           # 通配符 = 允许所有
    return str(sender_id) in allow_list

简单但实用。启动时还会校验:如果某个 Channel 的 allow_from 是空列表,直接 SystemExit 报错退出,避免上线后所有人都被拒。

Cron 定时任务

文件:nanobot/cron/service.py

一个自包含的定时调度服务,支持三种时间规则:

  • at:一次性执行(指定时间戳)
  • every:固定间隔(毫秒级精度)
  • cron:标准 cron 表达式(用 croniter 库解析,支持时区)
class CronService:
    def add_job(self, name, schedule, message, ...):
        job = CronJob(
            id=str(uuid.uuid4())[:8],
            name=name,
            schedule=schedule,
            payload=CronPayload(kind="agent_turn", message=message),
            ...
        )
        store.jobs.append(job)
        self._save_store()
        self._arm_timer()  # 重新计算下次唤醒时间

任务持久化到 JSON 文件。有个防并发修改的机制:每次操作前检查文件 mtime,如果被外部改过就重新加载。到时间了通过回调触发 Agent 执行。

Agent 自己也可以通过 CronTool 创建定时任务,比如用户说"每天早上 8 点提醒我看新闻",Agent 就调 cron 工具设定一个 job。

LLM Provider 抽象层

文件:nanobot/providers/base.py

class LLMProvider(ABC):
    _CHAT_RETRY_DELAYS = (1, 2, 4)  # 退避间隔

    @abstractmethod
    async def chat(self, messages, tools, model, ...) -> LLMResponse: ...

    async def chat_with_retry(self, **kw) -> LLMResponse:
        for attempt, delay in enumerate(self._CHAT_RETRY_DELAYS, 1):
            response = await self._safe_chat(**kw)
            if response.finish_reason != "error" or not self._is_transient_error(response.content):
                return response
            await asyncio.sleep(delay)  # 1s → 2s → 4s
        return await self._safe_chat(**kw)  # 最后一次

对 429(限流)、5xx 这类瞬态错误自动重试,退避策略 1→2→4 秒。

统一返回格式:

@dataclass
class LLMResponse:
    content: str | None
    tool_calls: list[ToolCallRequest]
    finish_reason: str = "stop"
    usage: dict[str, int]
    reasoning_content: str | None = None      # DeepSeek-R1 等推理模型
    thinking_blocks: list[dict] | None = None  # Anthropic extended thinking

配置文件支持十几个 provider(OpenAI、Anthropic、DeepSeek、Groq、火山引擎、硅基流动、OpenRouter 等),还有个通用的 custom 选项可以接任意 OpenAI 兼容接口。

有个降级处理:如果模型不支持多模态,_strip_image_content() 自动把消息里的图片替换成文本占位符,不至于直接报错。

Skills 系统:渐进式技能加载

文件:nanobot/agent/skills.py

每个技能是一个目录,里面放一个 SKILL.md 文件(Markdown 格式的 prompt 指令)。

class SkillsLoader:
    def __init__(self, workspace):
        self.workspace_skills = workspace / "skills"     # 用户自定义技能
        self.builtin_skills = BUILTIN_SKILLS_DIR         # 内置技能

    def load_skill(self, name) -> str | None:
        # 优先找用户自定义的,再找内置的
        workspace_skill = self.workspace_skills / name / "SKILL.md"
        if workspace_skill.exists():
            return workspace_skill.read_text()
        builtin_skill = self.builtin_skills / name / "SKILL.md"
        if builtin_skill.exists():
            return builtin_skill.read_text()
        return None

加载策略是"渐进式"的:

  1. system prompt 里只放所有技能的摘要(名称 + 一句话描述 + 文件路径)
  2. 标记了 always=true 的核心技能才会把完整内容塞进 prompt
  3. Agent 需要用某个技能时,自己调 read_file 工具去读 SKILL.md

这个设计是为了在技能多的时候控制 token 消耗。

一次完整请求的流转示例

串起来看一次完整的请求:

用户在 Telegram 里发:"帮我看看北京明天的天气"
  │
  ▼
TelegramChannel._handle_message()
  → is_allowed("user_123") ✓
  → InboundMessage(channel="telegram", chat_id="456", content="帮我看看北京明天的天气")
  → bus.publish_inbound(msg)
  │
  ▼
AgentLoop.run() 从 inbound queue 取到消息
  → sessions.get_or_create("telegram:456")
  → memory_consolidator.maybe_consolidate_by_tokens(session) → 还没超阈值,跳过
  → context.build_messages()
     → system prompt: 身份 + MEMORY.md 记忆 + 技能摘要
     → history: 之前的对话记录
     → user: "[Runtime Context: 2026-03-23 ...]\n\n帮我看看北京明天的天气"
  │
  ▼
_run_agent_loop() 开始:
  │
  ├─ 第1轮:调 LLM
  │   → LLM 返回 tool_call: web_search(query="北京明天天气")
  │   → ToolRegistry.execute("web_search", {"query": "北京明天天气"})
  │   → 搜索结果 append 到 messages
  │
  ├─ 第2轮:LLM 看到搜索结果
  │   → 直接返回文本:"北京明天晴,最高温度 22°C..."
  │   → 循环结束
  │
  ▼
session.messages.append(用户消息 + 所有中间消息 + 最终回复)
sessions.save(session)  → 写入 JSONL 文件
bus.publish_outbound(OutboundMessage(channel="telegram", chat_id="456", content="北京明天晴..."))
  │
  ▼
ChannelManager._dispatch_outbound()
  → channels["telegram"].send(msg)
  → 用户在 Telegram 收到回复

设计上的几个取舍

读完之后总结几个我觉得值得聊的设计取舍:

1. 单线程异步 vs 多线程/多进程

整个系统跑在一个 asyncio event loop 里。AgentLoop 是串行处理消息的——一条处理完了才处理下一条。好处是不用考虑并发安全,代码简单。代价是如果一条消息的处理卡住了(比如 LLM 调用慢),后面的消息就得排队。对于个人助手这个场景,这个取舍是合理的。

2. Session Append-Only

记忆整理后不删除 session 里的历史消息,只是写 MEMORY.md。这意味着 session 文件会越来越大。但好处是不破坏 LLM provider 侧的 KV cache——如果你改了历史消息,provider 的缓存就全废了。

3. 记忆整理用 LLM 而不是用规则

用 LLM 自己做记忆浓缩,而不是简单地截断或者用关键词提取。效果肯定更好,但每次整理要额外花一次 LLM 调用的钱。对个人用户来说这个成本可以接受。

4. 渐进式 Skill 加载

prompt 里不放技能全文,只放摘要。Agent 需要时自己 read_file 读取。节省了 token,但多了一轮工具调用的延迟。在技能多的场景下这个收益比较大。

总结

nanobot 的代码量不大,但把一个 Agent 系统需要的东西都覆盖到了。核心就三个:

  1. MessageBus 做解耦(bus/queue.py,不到 30 行)
  2. AgentLoop 跑 ReAct 循环(agent/loop.py
  3. MemoryConsolidator 做记忆管理(agent/memory.py

其余的 Channel、Provider、Tool、Skill、Cron 都是往这个骨架上挂的扩展模块。

如果你想搞清楚 AI Agent 到底是怎么工作的,我觉得读这个项目比读 LangChain 或者 AutoGen 的源码性价比高很多。代码规模小,设计意图清晰,没有过度抽象。


以上内容基于 nanobot v0.1.4 源码(2026 年 3 月),项目地址:https://github.com/HKUDS/nanobot

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐