义的多轮对话要求智能体能够理解上下文。就像人类一样,智能体需要记住对话历史:已经说过和做过什么,以保持连贯性并避免重复。

以下是OpenHands Applications的示例图,本篇就来看看会话和交互如何进行。

openhands-sdk

因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。

0x01 背景

本节基于 Google ADK 来进行背景介绍。

1.1 会话的意义

就像你不会每次发短信都从头开始一样,智能体也需要了解当前交互的上下文。常见的Agent系统会通过 SessionState 和 Memory 提供了结构化的上下文管理方式。

  1. Session:当前对话线程(可以将你与智能体的不同对话实例视为独立的对话线程,它们可能会利用长期知识
    • 表示用户与你的智能体系统之间单次、持续的交互
    • 包含该特定交互期间,智能体采取的消息和动作(称为 Events)的时间顺序序列。
    • 一个 Session 还可以保存仅在本次对话期间相关的临时数据(State)。
  2. State:当前对话中的数据
    • 存储在特定 Session 内的数据。
    • 用于管理当前(单次)、活跃对话线程相关的信息(例如,本次对话中的购物车商品,本 Session 中提到的用户偏好)。
    • 关注如何高效地读取、写入和管理 session 专属数据。
  3. Memory:可检索的跨 Session 信息
    • 表示可能跨越多个过去 Session或包含外部数据源的信息存储。
    • 它作为一个知识库,智能体可以检索以回忆超出当前对话的信息或上下文。

因此,Agent系统一般有如下两套组件或者服务:

  • SessionService:管理不同的对话线程(Session 对象)负责生命周期管理:创建、检索、更新(追加 Events、修改 State)和删除单个 Session
  • MemoryService:管理长期知识存储(Memory),负责将信息(通常来自已完成的 Session)导入长期存储。提供基于查询检索已存储知识的方法。

本篇介绍对话服务,后续另外介绍内存服务。

1.2 会话系统的常见功能

用户通常不会直接创建或管理 Session 对象,而是通过 SessionService。该服务作为会话生命周期的中央管理者。其核心职责包括:

  • 开启新对话: 当用户开始交互时,创建新的 Session 对象。
  • 恢复已有对话: 通过 ID 检索特定 Session,让智能体可以从上次中断处继续。
  • 保存进度: 将新的交互(Event 对象)追加到 session 历史。这也是 session state 更新的机制(详见 State 章节)。
  • 列出对话: 查找特定用户和应用的活跃会话线程。
  • 清理: 当对话结束或不再需要时,删除 Session 及其相关数据。

选择合适的 SessionService 是决定智能体对话历史和临时数据如何存储与持久化的关键。

1.3 Session 常见内容

一般来说,当用户开始与智能体交互时,SessionService 会创建一个 Session 对象。该对象作为单个对话线程相关所有内容的容器。其主要属性如下:

  1. 标识信息(idappNameuserId):

用于唯一标记对话的核心字段,具体说明如下:

  • id:当前对话线程的唯一标识符,是后续检索该对话的关键依据。一个 SessionService 对象可管理多个 Session(会话)实例,此字段用于明确当前操作对应的具体会话对象。示例值:"test_id_modification"。
  • app_name:标识当前对话所属的智能体应用。示例值:"id_modifier_workflow"。
  • userId:将对话与特定用户关联的关联字段,用于用户维度的对话管理与权限控制。
  1. 对话历史(events):

按时间顺序排列的交互序列,包含当前对话线程中发生的所有交互行为(以 Event 对象形式存储),涵盖用户消息、智能体响应、工具调用动作等全量交互记录。

  1. 会话状态(state):

用于存储仅与当前活跃对话相关的临时数据,相当于智能体在交互过程中的 “临时草稿本”。下一节将详细介绍 state 的具体使用与管理方式。

  1. 活动追踪(lastUpdateTime):

时间戳字段,记录当前对话线程中最后一次交互事件的发生时间,用于会话活跃度判断与过期管理。

1.4 会话生命周期

Session lifecycle

以下是 Session 与 SessionService 在一次对话轮次中协作的简化流程:

  1. 开始或恢复: 你的应用程序需要使用 SessionService 来要么 create_session(用于新聊天),要么使用现有的 session id。
  2. 提供上下文: Runner 从适当的服务方法获取相应的 Session 对象,为智能体提供对相应 Session 的 state 和 events 的访问权限。
  3. 智能体处理: 用户用查询提示智能体。智能体分析查询以及可能的 session state 和 events 历史来确定响应。
  4. 响应和状态更新: 智能体生成响应(并可能标记要在 state 中更新的数据)。Runner 将其打包为 Event
  5. 保存交互: Runner 调用 sessionService.append_event(session, event),将 session 和新的 event 作为参数。服务将 Event 添加到历史记录中,并根据事件中的信息更新存储中的 session state。session 的 last_update_time 也会得到更新。
  6. 准备下一次: 智能体的响应发送给用户。更新后的 Session 现在由 SessionService 存储,准备进行下一轮(这通常会在当前会话中继续对话,从步骤 1 重新开始循环)。
  7. 结束对话: 当对话结束时,你的应用程序调用 sessionService.delete_session(...) 来清理存储的会话数据(如果不再需要的话)。

此循环突出显示了 SessionService 如何通过管理每个 Session 对象的历史和状态,确保对话的连续性。

1.5 前文回顾

我们首先回顾前文介绍的,OpenHands项目关于对话的服务器端组件。

  • session.py 文件定义了Session类,它代表与客户端的WebSocket会话。
  • agent_session.py 文件包含AgentSession类,它管理会话内Agent的生命周期。
  • conversation_manager.py 文件定义了ConversationManager类,它负责管理多个客户端会话。
  • listen.py 文件是主服务器文件,它设置FastAPI应用程序并定义各种API端点。此处关键一步为与会话管理器 ConversationManager 建立连接。

以上几步展示了服务器组件如何构建会话,因此我们由此而入。

0x02 OpenHands 会话系统

会话是专门设计用于跟踪和管理单独对话线程的对象。会话就可以理解为AI代理的临时工作空间,就像你为特定项目准备的办公桌。它包含当前对话的所有必要工具、笔记和参考资料,一切都是即时可访问的,但也具有临时性和任务特定性。

具体而言,在OpenHands中:

  • WebSession 是一个 Web 服务器绑定的会话包装器,负责管理单个 Web 客户端连接并协调 AgentSession 生命周期。是 OpenHands 系统中连接前端用户界面和后端Agent执行的核心桥梁,负责协调整个交互流程。
  • AgentSession 是 OpenHands 框架中 Agent运行的 “上下文容器”,核心作用是封装Agent执行所需的所有组件(Agent、控制器、运行时、内存、事件流),统一管理它们的生命周期(初始化、启动、通信、关闭),并提供会话级的配置隔离、数据持久化和状态管理,是Agent能够独立、稳定执行任务的基础。

2.1 对话管理接口 ConversationManager

ConversationManager 类定义了对话管理的接口,适用于单机模式和集群模式。它负责处理对话的全生命周期,
包括创建、附加、分离和清理。这是OpenHands的一个扩展点,基于它构建的应用可通过服务器配置修改行为,无需改动其核心代码。应用程序可通过以下方式提供自定义实现:

  1. 创建一个继承自ConversationManager的类
  2. 实现所有必需的抽象方法
  3. 将server_config.conversation_manager_class设置为该实现类的全限定名

ConversationManager 定义如下。

class ConversationManager(ABC):
    """OpenHands中对话管理的抽象基类。
    应用程序可能需要在以下场景中自定义实现:
    - 具有分布式对话状态的集群部署
    - 自定义持久化或缓存策略
    - 与外部对话管理系统集成
    - 增强的监控或日志能力
    实现类通过openhands.server.shared.py中的get_impl()方法实例化。
    """

    sio: socketio.AsyncServer  # Socket.IO异步服务器实例,用于实时通信
    config: OpenHandsConfig    # OpenHands配置对象,存储系统参数
    file_store: FileStore      # 文件存储实例,用于管理对话相关文件
    conversation_store: ConversationStore  # 对话存储实例,用于持久化对话数据
2.1.1 StandaloneConversationManager

StandaloneConversationManager 是ConversationManager的子类。是默认实现,适用于单服务器部署场景。

@dataclass
class StandaloneConversationManager(ConversationManager):
    """Default implementation of ConversationManager for single-server deployments.

    See ConversationManager for extensibility details.
    """

    sio: socketio.AsyncServer
    config: OpenHandsConfig
    file_store: FileStore
    server_config: ServerConfig
    # Defaulting monitoring_listener for temp backward compatibility.
    monitoring_listener: MonitoringListener = MonitoringListener()
    _local_agent_loops_by_sid: dict[str, Session] = field(default_factory=dict)
    _local_connection_id_to_session_id: dict[str, str] = field(default_factory=dict)
    _active_conversations: dict[str, tuple[ServerConversation, int]] = field(
        default_factory=dict
    )
    _detached_conversations: dict[str, tuple[ServerConversation, float]] = field(
        default_factory=dict
    )
    _conversations_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
    _cleanup_task: asyncio.Task | None = None
    _conversation_store_class: type[ConversationStore] | None = None
    _loop: asyncio.AbstractEventLoop | None = None

2.1.2 Session初始化

Session初始化的流程图如下。

openhands-4.1-1

StandaloneConversationManager 的 join_conversation 函数如下,其调用了 maybe_start_agent_loop 初始化Agent。需要注意,此处 Session 是 WebSession。

    from openhands.server.session.session import WebSession as Session

    async def join_conversation(
        self,
        sid: str,
        connection_id: str,
        settings: Settings,
        user_id: str | None,
    ) -> AgentLoopInfo:
        await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
        self._local_connection_id_to_session_id[connection_id] = sid
        # 此处调用 maybe_start_agent_loop
        agent_loop_info = await self.maybe_start_agent_loop(sid, settings, user_id)
        return agent_loop_info

maybe_start_agent_loop 调用了 _start_agent_loop 初始化 Session。

class ConversationManager:
    def __init__(self, config: OpenHandsConfig, sio: Any, file_store: Any):
        self.config = config  # 框架全局配置
        self.sio = sio  # SocketIO实例(用于客户端通信)
        self.file_store = file_store  # 文件存储实例(用于会话数据持久化)
        self._local_agent_loops_by_sid: Dict[str, Session] = {}  # 会话ID到Session实例的映射(缓存活跃会话)
        self._loop = asyncio.get_event_loop()  # 事件循环实例

    async def maybe_start_agent_loop(
        self,
        sid: str,  # 会话ID(唯一标识一个对话)
        settings: Settings,  # 用户/会话设置(含Agent类型、LLM配置等)
        user_id: Optional[str] = None,  # 用户ID(可选,用于用户级并发控制)
        initial_user_msg: Optional[MessageAction] = None,  # 初始用户消息(可选,会话启动时的第一条消息)
        replay_json: Optional[str] = None,  # 回放JSON字符串(可选,用于会话回放场景)
    ) -> AgentLoopInfo:
        """
        尝试启动Agent循环:优先复用已存在的会话,不存在则新建。
        
        核心逻辑:
        - 检查会话ID对应的会话是否已存在(缓存于_local_agent_loops_by_sid)
        - 存在则直接返回会话信息,不存在则调用_start_agent_loop新建会话
        - 返回标准化的Agent循环信息(供外部调用者使用)
        """     
        # 从缓存中获取会话(复用已有会话,避免重复初始化)
        session = self._local_agent_loops_by_sid.get(sid)
        if not session:
            # 会话不存在,新建Agent循环
            session = await self._start_agent_loop(
                sid, settings, user_id, initial_user_msg, replay_json
            )
        
        # 将Session实例转换为标准化的AgentLoopInfo返回
        return self._agent_loop_info_from_session(session)

_start_agent_loop 该代码是 OpenHands 框架中 Agent循环(Agent Loop)的启动与管理核心,负责会话的创建、复用、并发控制和初始化,是连接用户请求与Agent执行的关键枢纽。其核心使命是:在遵守并发限制的前提下,为用户会话提供完整的组件初始化(LLM 注册表、统计、事件订阅),确保Agent能够顺畅启动并运行。

_start_agent_loop 的核心特色如下:

  1. 会话复用机制:通过 _local_agent_loops_by_sid 缓存活跃会话,避免重复初始化,提升响应速度和资源利用率(例如用户重新连接同一会话时直接复用)。
  2. 智能并发控制:基于用户 ID 限制最大并发会话数,超限后自动关闭最早的会话,同时向客户端发送友好通知,平衡资源占用与用户体验。
  3. 组件化初始化:整合 create_registry_and_conversation_stats 函数,一键完成 LLM 注册表、对话统计、配置适配三大核心组件的初始化,架构清晰且解耦。
  4. 异步非阻塞设计:Agent初始化(session.initialize_agent)通过 asyncio.create_task 异步执行,不阻塞会话创建流程,提升系统吞吐量。
  5. 事件驱动扩展:自动订阅会话事件流,通过回调函数响应会话更新,支持后续扩展监控、统计等功能,具备良好的可扩展性。
  6. 容错与兼容性:处理重复订阅异常,避免报错;支持会话回放(replay_json)和初始消息(initial_user_msg),适配正常对话、回放等多种场景。

_start_agent_loop 代码如下。

    async def _start_agent_loop(
        self,
        sid: str,
        settings: Settings,
        user_id: Optional[str] = None,
        initial_user_msg: Optional[MessageAction] = None,
        replay_json: Optional[str] = None,
    ) -> Session:
        """
        内部方法:实际创建并启动Agent循环,包含并发控制、会话初始化、事件订阅等核心流程。
        """
        # 1. 并发会话数量控制:检查用户当前活跃会话数是否超过上限
        # 获取用户当前运行中的所有会话ID
        running_session_ids = await self.get_running_agent_loops(user_id)
        # 若超过最大并发数,关闭最早的会话以释放资源
        if len(running_session_ids) >= self.config.max_concurrent_conversations:
           
            # 获取用户的会话存储实例,读取所有活跃会话的元数据
            conversation_store = await self._get_conversation_store(user_id)
            conversations = await conversation_store.get_all_metadata(running_session_ids)
            # 按最后更新时间排序(最新的在前, oldest的在后)
            conversations.sort(key=_last_updated_at_key, reverse=True)

            # 循环关闭最早的会话,直到并发数符合限制
            while len(conversations) >= self.config.max_concurrent_conversations:
                oldest_conversation = conversations.pop()  # 取出最早的会话
                oldest_sid = oldest_conversation.conversation_id
               
                # 向客户端发送错误通知(告知会话已关闭)
                status_update = {
                    'status_update': True,
                    'type': 'error',
                    'id': 'AGENT_ERROR$TOO_MANY_CONVERSATIONS',
                    'message': '同时开启的会话数已达上限。若仍需使用该会话,可发送消息重新激活Agent',
                }
                # 在事件循环中发送SocketIO事件(定向到该会话的房间)
                await run_in_loop(
                    self.sio.emit(
                        'oh_event',
                        status_update,
                        to=ROOM_KEY.format(sid=oldest_sid),  # 按会话ID定向发送
                    ),
                    self._loop,
                )
                
                # 关闭最早的会话(释放资源)
                await self.close_session(oldest_sid)

        # 2. 初始化核心组件:LLM注册表、对话统计、最终配置
        llm_registry, conversation_stats, final_config = (
            create_registry_and_conversation_stats(self.config, sid, user_id, settings)
        )
        
        # 3. 创建Session实例(封装会话的所有状态和组件)
        session = Session(
            sid=sid,
            file_store=self.file_store,  # 绑定文件存储
            config=final_config,  # 绑定最终配置
            llm_registry=llm_registry,  # 绑定LLM注册表
            conversation_stats=conversation_stats,  # 绑定对话统计
            sio=self.sio,  # 绑定SocketIO实例
            user_id=user_id,  # 绑定用户ID
        )
        
        # 4. 将新会话缓存到本地(供后续复用)
        self._local_agent_loops_by_sid[sid] = session
        
        # 5. 异步初始化Agent(不阻塞当前流程):加载Agent、处理初始消息、回放会话(若有)
        asyncio.create_task(
            session.initialize_agent(settings, initial_user_msg, replay_json)
        )
        
        # 6. 订阅会话事件流:监听会话更新事件(仅新建会话时订阅,复用会话跳过)
        try:
            session.agent_session.event_stream.subscribe(
                subscriber=EventStreamSubscriber.SERVER,  # 订阅者类型:服务器
                callback=self._create_conversation_update_callback(
                    user_id, sid, settings, llm_registry  # 会话更新回调函数
                ),
                callback_id=UPDATED_AT_CALLBACK_ID,  # 回调ID(用于后续取消订阅)
            )
        except ValueError:
            # 若已存在相同ID的订阅,忽略该操作(避免重复订阅)

        # 返回创建好的Session实例
        return session

2.2 session.py(WebSession)

session.py 定义了 WebSession 类,它是 OpenHands 系统中管理 Web 客户端会话的核心组件。

2.2.1 WebSession 类概述

WebSession 是一个 Web 服务器绑定的会话包装器,负责管理单个 Web 客户端连接并协调 AgentSession 生命周期。WebSession 的关键设计模式为:

  • 异步队列模式:使用 asyncio.Queue 管理事件发布,确保非阻塞操作
  • 事件驱动架构:通过事件订阅/发布机制实现组件解耦
  • 状态管理模式:跟踪会话状态和连接状态
  • 错误处理机制:全面的异常捕获和错误报告

WebSession 是 OpenHands 系统中连接前端用户界面和后端Agent执行的核心桥梁,负责协调整个交互流程。

2.2.2 WebSession 核心属性

WebSession 核心属性为:

  • sid:稳定的会话 ID,跨传输保持一致
  • sio:Socket.IO 服务器,用于向 Web 客户端发送事件
  • agent_session:核心Agent会话,协调运行时和 LLM
  • config:有效的 OpenHands 配置
  • llm_registry:负责 LLM 访问和重试钩子的注册表
  • file_store:会话的文件存储接口
  • user_id:可选的多租户用户标识符

WebSession会订阅 EventStreamSubscriber.SERVER。

class WebSession:
    """Web server-bound session wrapper.

    This was previously named `Session`. We keep `Session` as a compatibility alias
    (see openhands.server.session.__init__) so downstream imports/tests continue to
    work. The class manages a single web client connection and orchestrates the
    AgentSession lifecycle for that conversation.
    """

    sid: str
    sio: socketio.AsyncServer | None
    last_active_ts: int = 0
    is_alive: bool = True
    agent_session: AgentSession
    loop: asyncio.AbstractEventLoop
    config: OpenHandsConfig
    llm_registry: LLMRegistry
    file_store: FileStore
    user_id: str | None
    logger: LoggerAdapter
2.2.3 主要功能模块

WebSession 的主要功能模块为:

  • 初始化和配置管理
    • init 方法设置会话的基本配置和组件
    • 订阅 EventStream 的 SERVER 事件
    • 初始化异步事件发布队列
    • Agent初始化(initialize_agent)
    • 配置Agent、LLM 和运行时环境
    • 处理 MCP(Model Context Protocol)配置
    • 设置 condenser(压缩器)配置
    • 启动 AgentSession
    • 错误处理和验证
  • 事件处理(on_event 和 _on_event)
    • 处理来自Agent的事件
      • 过滤 NullAction 和 NullObservation
      • 根据事件源决定如何处理和转发事件
      • 将环境反馈作为Agent事件发送给 UI
  • 消息分发(dispatch)
    • 处理来自用户的事件
    • 验证图像支持
    • 将事件添加到事件流中
  • 异步消息发送(send, _monitor_publish_queue, _send)
    • 使用队列机制异步发送消息
    • 确保 WebSocket 连接稳定后再发送
    • 处理连接状态和错误
  • 状态管理
    • close 方法清理会话资源
    • queue_status_message 和 _send_status_message 处理状态更新消息
2.2.4 与系统其他组件的关系

WebSession 与系统其他组件的关系如下:

  • 与 EventStream 集成

    • 作为 SERVER 订阅者接收事件
    • 处理来自Agent和用户的事件流
  • 与 AgentSession 协作:

    • 管理 AgentSession 生命周期
    • 转发用户事件到Agent
    • 将Agent响应发送给客户端
  • 与 Socket.IO 集成:

    • 使用 Socket.IO 向客户端发送实时事件
    • 管理 WebSocket 连接状态
2.2.5 初始化 AgentSession

WebSession 的初始化会中完成对AgentSession的初始化,AgentSession的初始化中又做了EventStream的初始化,所以整个会话的EventStream也就是在这里创建的,这里在事件流中订阅了EventStreamSubscriber.SERVER,事件回调函数中将需要向前端广播的事件通过socket进行发送。

class WebSession:
    def __init__(
        self,
        sid: str,
        config: OpenHandsConfig,
        llm_registry: LLMRegistry,
        conversation_stats: ConversationStats,
        file_store: FileStore,
        sio: socketio.AsyncServer | None,
        user_id: str | None = None,
    ):
        self.sid = sid
        self.sio = sio
        self.last_active_ts = int(time.time())
        self.file_store = file_store
        self.logger = OpenHandsLoggerAdapter(extra={'session_id': sid})
        self.llm_registry = llm_registry
        self.conversation_stats = conversation_stats
        self.agent_session = AgentSession(
            sid,
            file_store,
            llm_registry=self.llm_registry,
            conversation_stats=conversation_stats,
            status_callback=self.queue_status_message,
            user_id=user_id,
        )
        self.agent_session.event_stream.subscribe(
            EventStreamSubscriber.SERVER, self.on_event, self.sid
        )
        self.config = config

        # Lazy import to avoid ircular dependency
        from openhands.experiments.experiment_manager import ExperimentManagerImpl

        self.config = ExperimentManagerImpl.run_config_variant_test(
            user_id, sid, self.config
        )
        self.loop = asyncio.get_event_loop()
        self.user_id = user_id

        self._publish_queue: asyncio.Queue = asyncio.Queue()
        self._monitor_publish_queue_task: asyncio.Task = self.loop.create_task(
            self._monitor_publish_queue()
        )
        self._wait_websocket_initial_complete: bool = True

agent_session.start()中完成了security_analyzer、runtime、memory、controller的初始化。

2.3 agent_session

2.3.1 AgentSession

AgentSession 是 OpenHands 框架中 Agent运行的 “上下文容器”,核心作用是封装Agent执行所需的所有组件(Agent、控制器、运行时、内存、事件流),统一管理它们的生命周期(初始化、启动、通信、关闭),并提供会话级的配置隔离、数据持久化和状态管理,是Agent能够独立、稳定执行任务的基础。

AgentSession 核心特色如下:

  1. 全组件生命周期管理:集中初始化并关联Agent、控制器、运行时、内存、事件流等核心组件,确保组件间通信顺畅,生命周期一致(启动 / 关闭同步)。
  2. 灵活的环境配置:支持 Git 仓库集成、自定义密钥注入、MCP 工具扩展等,适配代码开发、第三方服务调用等复杂场景,满足多样化任务需求。
  3. 会话状态安全管控:严格校验会话状态(避免重复启动、已关闭会话启动失败),通过状态标记(_starting/_closed)确保流程安全性,减少异常。
  4. 支持会话回放与状态恢复:提供 _run_replay 接口支持从 JSON 数据恢复历史会话,便于调试、任务续跑和场景复现。
  5. 精细化日志与监控:集成带会话上下文的日志器,记录启动耗时、成功状态、状态恢复等元数据,便于问题排查和系统监控。
  6. 安全与隔离设计:通过自定义密钥处理器(UserSecrets)安全管理第三方密钥,运行时环境隔离执行代码,避免敏感信息泄露和系统风险。
  7. 状态驱动的事件机制:启动时根据是否有初始消息自动设置Agent状态(运行中 / 等待用户输入),并通过事件流同步状态,确保组件间状态一致性。

AgentSession 定义如下:

class AgentSession:
    """
        Agent会话类:封装Agent运行的完整上下文,管理Agent、控制器、运行时、内存等核心组件的生命周期。
        属性说明:
        controller: Agent控制器实例(负责调度Agent执行流程)
        sid: 会话唯一标识
        user_id: 用户ID(可选)
        event_stream: 事件流(组件间通信核心)
        llm_registry: LLM注册表(管理LLM实例)
        file_store: 文件存储(持久化会话数据)
        runtime: 运行时环境(如沙盒,执行代码/命令)
        memory: Agent内存(存储会话历史、上下文等)
        _starting: 会话启动中标记
        _started_at: 会话启动时间戳
        _closed: 会话关闭标记
        loop: 异步事件循环
        logger: 带会话上下文的日志器
    """

    sid: str
    user_id: Optional[str]
    event_stream: EventStream
    llm_registry: LLMRegistry
    file_store: FileStore
    controller: Optional[AgentController] = None
    runtime: Optional[Runtime] = None

    memory: Optional[Memory] = None
    _starting: bool = False
    _started_at: float = 0
    _closed: bool = False
    loop: Optional[asyncio.AbstractEventLoop] = None
    logger: LoggerAdapter

    def __init__(
        self,
        sid: str,
        file_store: FileStore,
        llm_registry: LLMRegistry,
        conversation_stats: ConversationStats,
        status_callback: Optional[Callable] = None,
        user_id: Optional[str] = None,
    ) -> None:
        """
        初始化AgentSession实例。

        参数:
            sid: 会话ID(唯一标识)
            file_store: 文件存储实例(用于事件流、内存数据持久化)
            llm_registry: LLM注册表实例(提供LLM资源)
            conversation_stats: 对话统计实例(记录会话相关统计数据)
            status_callback: 状态回调函数(可选,会话状态变更时触发)
            user_id: 用户ID(可选,用于用户级数据隔离)
        """
        self.sid = sid
        # 初始化事件流(会话内组件通信的核心枢纽)
        self.event_stream = EventStream(sid, file_store, user_id)
        self.file_store = file_store
        self._status_callback = status_callback  # 状态变更回调(如通知客户端)
        self.user_id = user_id
        # 初始化带会话上下文的日志器(便于追踪会话级日志)
        self.logger = OpenHandsLoggerAdapter(
            extra={'session_id': sid, 'user_id': user_id}
        )
        self.llm_registry = llm_registry  # 绑定LLM注册表
        self.conversation_stats = conversation_stats  # 绑定对话统计实例

AgentSession 初始化完成后向事件流中添加ChangeAgentStateAction事件。

openhands-4.1-2

具体代码如下:

    async def start(
        self,
        runtime_name: str,  # 运行时名称(如"sandbox",指定运行环境类型)
        config: OpenHandsConfig,  # 框架全局配置
        agent: Agent,  # 已初始化的Agent实例
        max_iterations: int,  # Agent执行的最大迭代次数(防止无限循环)
        git_provider_tokens: Optional[PROVIDER_TOKEN_TYPE] = None,  # Git提供商令牌(如GitHub令牌)
        custom_secrets: Optional[CUSTOM_SECRETS_TYPE] = None,  # 自定义密钥(第三方服务访问用)
        max_budget_per_task: Optional[float] = None,  # 单任务最大预算(可选)
        agent_to_llm_config: Optional[Dict[str, LLMConfig]] = None,  # Agent-LLM配置映射
        agent_configs: Optional[Dict[str, AgentConfig]] = None,  # 所有Agent配置字典
        selected_repository: Optional[str] = None,  # 选中的Git仓库地址(可选)
        selected_branch: Optional[str] = None,  # 选中的仓库分支(可选)
        initial_message: Optional[MessageAction] = None,  # 初始用户消息(可选)
        conversation_instructions: Optional[str] = None,  # 会话指令(自定义Agent行为)
        replay_json: Optional[str] = None,  # 会话回放JSON数据(可选)
    ) -> None:
        """
        启动Agent会话:初始化运行时、内存、控制器,触发Agent开始执行。
        
        核心流程:
        1. 校验会话状态(避免重复启动)
        2. 创建运行时环境(如沙盒)
        3. 配置Git令牌、自定义密钥
        4. 创建Agent内存(存储上下文、会话指令等)
        5. (可选)添加MCP工具到Agent
        6. (可选)执行会话回放
        7. 创建Agent控制器(调度Agent执行)
        8. 发送初始事件(启动状态/等待用户输入)
        """
        # 校验会话状态:已存在控制器或运行时 → 抛出异常(避免重复启动)
        if self.controller or self.runtime:
            raise RuntimeError(
                'Session already started. You need to close this session and start a new one.'
            )

        # 会话已关闭 → 日志警告并返回
        if self._closed:
            self.logger.warning('Session closed before starting')
            return
        
        self._starting = True  # 标记会话启动中
        started_at = time.time()
        self._started_at = started_at
        finished = False  # 执行完成标记(用于监控)
        runtime_connected = False  # 运行时连接成功标记
        restored_state = False  # 状态恢复标记(会话回放/恢复场景)
        
        # 初始化自定义密钥处理器(管理第三方服务密钥)
        custom_secrets_handler = UserSecrets(
            custom_secrets=custom_secrets if custom_secrets else {}
        )

        try:
            # 1. 创建运行时环境(如Docker沙盒)并连接
            runtime_connected = await self._create_runtime(
                runtime_name=runtime_name,
                config=config,
                agent=agent,
                git_provider_tokens=git_provider_tokens,
                custom_secrets=custom_secrets,
                selected_repository=selected_repository,
                selected_branch=selected_branch,
            )

            # 提取仓库目录名(若指定了Git仓库)
            repo_directory = None
            if self.runtime and runtime_connected and selected_repository:
                repo_directory = selected_repository.split('/')[-1]  # 从仓库地址提取目录名(如"openhands")

            # 2. 配置Git提供商令牌(若有)
            if git_provider_tokens:
                provider_handler = ProviderHandler(provider_tokens=git_provider_tokens)
                await provider_handler.set_event_stream_secrets(self.event_stream)  # 注入令牌到事件流

            # 3. 配置自定义密钥(若有)
            if custom_secrets:
                custom_secrets_handler.set_event_stream_secrets(self.event_stream)  # 注入自定义密钥到事件流

            # 4. 创建Agent内存(存储会话上下文、仓库信息、会话指令等)
            self.memory = await self._create_memory(
                selected_repository=selected_repository,
                repo_directory=repo_directory,
                selected_branch=selected_branch,
                conversation_instructions=conversation_instructions,
                custom_secrets_descriptions=custom_secrets_handler.get_custom_secrets_descriptions(),  # 密钥描述(供Agent参考)
                working_dir=config.workspace_mount_path_in_sandbox,  # 沙盒中的工作目录路径
            )

            # 5. (可选)添加MCP工具到Agent(需运行时已连接且Agent启用MCP)
            if self.runtime and runtime_connected and agent.config.enable_mcp:
                await add_mcp_tools_to_agent(agent, self.runtime, self.memory)

            # 6. (可选)执行会话回放(从replay_json恢复历史会话)
            if replay_json:
                initial_message = self._run_replay(
                    initial_message,
                    replay_json,
                    agent,
                    config,
                    max_iterations,
                    max_budget_per_task,
                    agent_to_llm_config,
                    agent_configs,
                )
            # 7. (正常场景)创建Agent控制器(调度Agent执行流程)
            else:
                self.controller, restored_state = self._create_controller(
                    agent,
                    config.security.confirmation_mode,  # 安全确认模式(如自动确认/手动确认)
                    max_iterations,
                    max_budget_per_task=max_budget_per_task,
                    agent_to_llm_config=agent_to_llm_config,
                    agent_configs=agent_configs,
                )

            # 8. 发送初始事件(根据是否有初始消息设置Agent状态)
            if not self._closed:
                if initial_message:
                    # 有初始消息 → 向事件流添加用户消息,设置Agent状态为"运行中"
                    self.event_stream.add_event(initial_message, EventSource.USER)
                    self.event_stream.add_event(
                        ChangeAgentStateAction(AgentState.RUNNING),
                        EventSource.ENVIRONMENT,
                    )
                else:
                    # 无初始消息 → 设置Agent状态为"等待用户输入"
                    self.event_stream.add_event(
                        ChangeAgentStateAction(AgentState.AWAITING_USER_INPUT),
                        EventSource.ENVIRONMENT,
                    )
            finished = True  # 标记执行完成

        finally:
            self._starting = False  # 取消启动中标记
            # 计算启动结果(是否成功:执行完成且运行时连接成功)
            success = finished and runtime_connected
            duration = time.time() - started_at  # 计算启动耗时

            # 日志元数据(用于监控和分析)
            # 记录启动结果日志

2.4.2 初始化Agent

_start_agent_loop 会调用 initialize_agent 初始化 Agent。session初始化完成后调用initialize_agent进一步完成agent的其余模块初始化工作,首先创建llm和agent,然后调用agent_session.start()

核心作用

initialize_agent 是 OpenHands 框架中 Agent实例的初始化核心方法,负责将用户设置、默认配置、第三方服务配置融合为最终运行配置,创建Agent实例并启动Agent会话,是连接配置与Agent运行的关键桥梁,确保Agent具备完成任务所需的所有能力(工具访问、LLM 支持、安全控制等)。

核心特色
  1. 配置融合机制:用户设置优先于默认配置,支持安全、沙盒、Git、第三方服务等多维度配置的灵活覆盖,兼顾通用性与个性化需求。
  2. 模块化压缩器设计:默认启用三阶段上下文压缩器流水线,按 “对话窗口→浏览器输出→LLM 总结” 顺序优化上下文,平衡上下文相关性与模型输入长度限制。
  3. 完整的服务配置:自动配置 MCP 服务器(Agent与工具的通信枢纽),支持自定义 MCP 配置扩展,适配不同部署环境的工具通信需求。
  4. 安全与隐私保护:敏感信息(如沙盒 API 密钥)通过 get_secret_value() 安全提取,未知错误仅返回错误类型,避免泄露敏感配置。
  5. 丰富的扩展参数:支持 Git 仓库访问、自定义密钥、会话指令等扩展参数,适配代码开发、第三方服务集成等复杂场景。
  6. 精细化错误处理:区分不同类型错误(微Agent验证错误、值错误、未知错误),返回针对性错误信息,便于问题排查。
  7. 状态可视化:初始化开始时发送 AgentState.LOADING 状态事件,让客户端实时感知Agent启动进度,提升用户体验。
初始化流程图

openhands-4.1-3

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐