从零构建企业生产级Agent服务
从零构建生产级Agent服务
1、系统功能
-
使用FastAPI框架实现对外提供Agent智能体API后端接口服务
-
使用LangGraph中预置的ReAct架构的Agent
-
支持Short-term(短期记忆)并使用PostgreSQL进行持久化存储
-
支持Function Calling,包含自定义工具和MCP Server提供的工具
-
支持Human in the loop(HITL 人工审查)对工具调用提供人工审查功能,支持四种审查类型
-
支持多厂家大模型接口调用,OpenAI、阿里通义千问、本地开源大模型(Ollama)等
-
支持Redis存储用户会话状态,支持客户端的故障恢复和服务端的故障恢复
-
使用功能强大的rich库实现前端demo应用,与后端API接口服务联调
-
支持动态调整会话的过期时间
-
支持用户登录到系统后自动打开最近一次使用的会话,若无则新建会话
-
支持历史会话管理和历史会话恢复
-
支持修剪短期记忆中的聊天历史以满足上下文对token数量或消息数量的限制
-
支持读取和写入长期记忆(如用户偏好设置等)
-
支持异步模式调用Agent运行,支持并行(Celery是一个强大的异步任务队列/作业队列库),接口立即返回task_id
-
支持客户端随时通过task_id来查询服务端任务的状态与响应内容
2、核心业务流程
2.1 后端业务核心流程
-
docs/01_后端业务核心流程.pdf
-
docs/02_API接口和数据模型描述.pdf
2.2 前端业务核心流程
-
docs/03_前端业务核心流程.pdf
3、前期准备工作
3.1 集成开发环境搭建
-
anaconda提供python虚拟环境(Conda)
-
pycharm提供集成开发环境
3.2 大模型LLM服务接口调用方案
-
OpenAI等国外大模型使用方案
-
国内无法直接访问,可以使用代理的方式,具体代理方案自己选择
-
-
国内大模型采用厂商原生接口
-
本地开源大模型方案(Ollama方式)
4、项目初始化
4.1 安装项目依赖
# 创建项目虚拟环境 conda create -n ReActAgents python=3.11
# 安装项目依赖 pip install langgraph==0.4.5 pip install langchain==0.3.25 pip install langchain-openai==0.3.17 pip install langgraph-checkpoint-postgres==2.0.21 pip install rich==14.0.0 pip install fastapi==0.115.12 pip install redis==6.2.0 pip install concurrent-log-handler==0.9.28 pip install celery==5.5.3
[!CAUTION]
建议先使用要求的对应版本进行本项目测试,避免因版本升级造成的代码不兼容。测试通过后,可进行升级测试。
4.2 构建项目
使用PyCharm构建一个项目,为项目配置虚拟python环境 项目名称:ReActAgents
5、功能测试
5.1 使用Docker方式运行PostgreSQL数据库和Redis数据库
-
进入官网 https://www.docker.com/ 下载安装Docker Desktop软件并安装,安装完成后打开软件
-
打开命令行终端,进入到docker/postgresql下执行 docker-compose up -d 运行PostgreSQL服务
-
进入到docker/redis下执行 docker-compose up -d 运行Redis服务
-
运行成功后可在Docker Desktop软件中进行管理操作或使用命令行操作或使用指令
-
使用数据库客户端软件远程登陆进行可视化操作,这里使用Navicat客户端软件和Redis-Insight客户端软件
5.2 测试 HITL 对工具请求进行人类反馈
-
进入 04_ReActAgentHITLApi 文件夹下运行脚本进行测试
-
运行后端服务
python 01_backendServer.pyimport asyncio import sys if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) import logging from concurrent_log_handler import ConcurrentRotatingFileHandler from pydantic import BaseModel, Field import time from fastapi import FastAPI, HTTPException from typing import Dict, Any, Optional, List import uuid from langgraph.types import interrupt, Command from langgraph.prebuilt import create_react_agent from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver import uvicorn from contextlib import asynccontextmanager import redis.asyncio as redis import json from datetime import timedelta from psycopg_pool import AsyncConnectionPool from utils.config import Config from utils.llms import get_llm from utils.tools import get_tools # 设置日志基本配置,级别为DEBUG或INFO logger = logging.getLogger(__name__) # 设置日志器级别为DEBUG logger.setLevel(logging.DEBUG) # logger.setLevel(logging.INFO) logger.handlers = [] # 清空默认处理器 # 使用ConcurrentRotatingFileHandler handler = ConcurrentRotatingFileHandler( # 日志文件 Config.LOG_FILE, # 日志文件最大允许大小为5MB,达到上限后触发轮转 maxBytes=Config.MAX_BYTES, # 在轮转时,最多保留3个历史日志文件 backupCount=Config.BACKUP_COUNT ) # 设置处理器级别为DEBUG handler.setLevel(logging.DEBUG) handler.setFormatter(logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" )) logger.addHandler(handler) # 定义数据模型 客户端发起的智能体请求 class AgentRequest(BaseModel): # 用户唯一标识 user_id: str # 用户的问题 query: str # 系统提示词 system_message: Optional[str] = "你会使用工具来帮助用户。如果工具使用被拒绝,请提示用户。" # 定义数据模型 智能体给予的响应 class AgentResponse(BaseModel): # 会话唯一标识 session_id: str # 三个状态:interrupted, completed, error status: str # 时间戳 timestamp: float = Field(default_factory=lambda: time.time()) # error时的提示消息 message: Optional[str] = None # completed时的结果消息 result: Optional[Dict[str, Any]] = None # interrupted时的中断消息 interrupt_data: Optional[Dict[str, Any]] = None # 定义数据模型 客户端给予的反馈响应 class InterruptResponse(BaseModel): # 用户唯一标识 user_id: str # 会话唯一标识 session_id: str # 响应类型:accept(允许调用), edit(调整工具参数,此时args中携带修改后的调用参数), response(直接反馈信息,此时args中携带修改后的调用参数),reject(不允许调用) response_type: str # 如果是edit, response类型,可能需要额外的参数 args: Optional[Dict[str, Any]] = None # 定义数据模型 系统信息响应 class SystemInfoResponse(BaseModel): # 当前系统内会话总数 sessions_count: int # 当前活跃的用户 active_users: List[str] # 定义数据模型 会话状态信息响应 class SessionStatusResponse(BaseModel): # 用户唯一标识符 user_id: str # 会话唯一标识符 session_id: Optional[str] = None # 状态:not_found, idle, running, interrupted, completed, error status: str # error时的提示消息 message: Optional[str] = None # 上次查询 last_query: Optional[str] = None # 上次更新时间 last_updated: Optional[float] = None # 上次响应 last_response: Optional[AgentResponse] = None # 实现redis相关方法 class RedisSessionManager: # 初始化异步 Redis 连接和会话配置 def __init__(self, redis_host, redis_port, redis_db, session_timeout): self.redis_client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True ) self.session_timeout = session_timeout # 会话过期时间(秒) # 关闭 Redis 连接 async def close(self): await self.redis_client.close() # 创建新会话,匹配指定数据结构 # 会话存储 - 保存每个用户的智能体实例和状态 # 暂时只支持一个用户一个会话,不能一个用户多个会话 # 结构: {user_id: { # "session_id": session_id, # 会话ID # "status": "idle|running|interrupted|completed|error", # 会话状态 # "last_response": AgentResponse, # 上次响应 # "last_query": str, # 上次查询 # "last_updated": timestamp # 上次更新时间 # }} async def create_session(self, user_id: str, session_id: Optional[str] = None, status: str = "active", last_query: Optional[str] = None, last_response: Optional['AgentResponse'] = None, last_updated: Optional[float] = None) -> str: if session_id is None: session_id = str(uuid.uuid4()) if last_updated is None: last_updated = str(timedelta(seconds=0)) session_data = { user_id: { "session_id": session_id, "status": status, "last_response": last_response.model_dump() if isinstance(last_response, BaseModel) else last_response, "last_query": last_query, "last_updated": last_updated } } await self.redis_client.set( f"session:{user_id}", json.dumps(session_data, default=lambda o: o.__dict__ if not hasattr(o, 'model_dump') else o.model_dump()), ex=self.session_timeout ) return session_id # 获取会话数据 async def get_session(self, user_id: str) -> Optional[dict]: session_data = await self.redis_client.get(f"session:{user_id}") if not session_data: return None session = json.loads(session_data).get(user_id) if session and "last_response" in session: if session["last_response"] is not None: try: session["last_response"] = AgentResponse(**session["last_response"]) except Exception as e: logger.error(f"转换 last_response 失败: {e}") session["last_response"] = None return session # 更新会话数据 async def update_session(self, user_id: str, status: Optional[str] = None, last_query: Optional[str] = None, last_response: Optional['AgentResponse'] = None, last_updated: Optional[float] = None) -> bool: if await self.redis_client.exists(f"session:{user_id}"): current_data = await self.get_session(user_id) if not current_data: return False # 更新提供的字段 if status is not None: current_data["status"] = status if last_response is not None: if isinstance(last_response, BaseModel): current_data["last_response"] = last_response.model_dump() else: current_data["last_response"] = last_response if last_query is not None: current_data["last_query"] = last_query if last_updated is not None: current_data["last_updated"] = last_updated # 保持数据结构 session_data = {user_id: current_data} # 重新存储并刷新过期时间 await self.redis_client.set( f"session:{user_id}", json.dumps(session_data, default=lambda o: o.__dict__ if not hasattr(o, 'model_dump') else o.model_dump()), ex=self.session_timeout ) return True return False # 删除会话 async def delete_session(self, user_id: str) -> bool: return (await self.redis_client.delete(f"session:{user_id}")) > 0 # 获取所有会话数量 async def get_session_count(self) -> int: count = 0 async for _ in self.redis_client.scan_iter("session:*"): count += 1 return count # 获取所有 user_id async def get_all_user_ids(self) -> List[str]: user_ids = [] async for key in self.redis_client.scan_iter("session:*"): user_id = key.split(":", 1)[1] user_ids.append(user_id) return user_ids # 检查 user_id 是否在 Redis 中 async def user_id_exists(self, user_id: str) -> bool: return (await self.redis_client.exists(f"session:{user_id}")) > 0 # 解析state消息列表进行格式化展示,生产环境中注释 async def parse_messages(messages: List[Any]) -> None: """ 解析消息列表,打印 HumanMessage、AIMessage 和 ToolMessage 的详细信息 Args: messages: 包含消息的列表,每个消息是一个对象 """ print("=== 消息解析结果 ===") for idx, msg in enumerate(messages, 1): print(f"\n消息 {idx}:") # 获取消息类型 msg_type = msg.__class__.__name__ print(f"类型: {msg_type}") # 提取消息内容 content = getattr(msg, 'content', '') print(f"内容: {content if content else '<空>'}") # 处理附加信息 additional_kwargs = getattr(msg, 'additional_kwargs', {}) if additional_kwargs: print("附加信息:") for key, value in additional_kwargs.items(): if key == 'tool_calls' and value: print(" 工具调用:") for tool_call in value: print(f" - ID: {tool_call['id']}") print(f" 函数: {tool_call['function']['name']}") print(f" 参数: {tool_call['function']['arguments']}") else: print(f" {key}: {value}") # 处理 ToolMessage 特有字段 if msg_type == 'ToolMessage': tool_name = getattr(msg, 'name', '') tool_call_id = getattr(msg, 'tool_call_id', '') print(f"工具名称: {tool_name}") print(f"工具调用 ID: {tool_call_id}") # 处理 AIMessage 的工具调用和元数据 if msg_type == 'AIMessage': tool_calls = getattr(msg, 'tool_calls', []) if tool_calls: print("工具调用:") for tool_call in tool_calls: print(f" - 名称: {tool_call['name']}") print(f" 参数: {tool_call['args']}") print(f" ID: {tool_call['id']}") # 提取元数据 metadata = getattr(msg, 'response_metadata', {}) if metadata: print("元数据:") token_usage = metadata.get('token_usage', {}) print(f" 令牌使用: {token_usage}") print(f" 模型名称: {metadata.get('model_name', '未知')}") print(f" 完成原因: {metadata.get('finish_reason', '未知')}") # 打印消息 ID msg_id = getattr(msg, 'id', '未知') print(f"消息 ID: {msg_id}") print("-" * 50) # 处理智能体返回结果 可能是中断,也可能是最终结果 async def process_agent_result( session_id: str, result: Dict[str, Any], user_id: Optional[str] = None ) -> AgentResponse: """ 处理智能体执行结果,统一处理中断和结果 Args: session_id: 会话ID result: 智能体执行结果 user_id: 用户ID,如果提供,将更新会话状态 Returns: AgentResponse: 标准化的响应对象 """ response = None try: # 检查是否有中断 if "__interrupt__" in result: interrupt_data = result["__interrupt__"][0].value # 确保中断数据有类型信息 if "interrupt_type" not in interrupt_data: interrupt_data["interrupt_type"] = "unknown" # 返回中断信息 response = AgentResponse( session_id=session_id, status="interrupted", interrupt_data=interrupt_data ) logger.info(f"当前触发工具调用中断:{response}") # 如果没有中断,返回最终结果 else: response = AgentResponse( session_id=session_id, status="completed", result=result ) logger.info(f"最终智能体回复结果:{response}") except Exception as e: response = AgentResponse( session_id=session_id, status="error", message=f"处理智能体结果时出错: {str(e)}" ) logger.error(f"处理智能体结果时出错:{response}") # 如果提供了用户ID,更新会话状态 exists = await app.state.session_manager.user_id_exists(user_id) if user_id and exists: status = response.status last_query = None last_response = response last_updated = time.time() await app.state.session_manager.update_session(user_id, status, last_query, last_response, last_updated) return response # 生命周期函数 app应用初始化函数 @asynccontextmanager async def lifespan(app: FastAPI): # 在这里也设置事件循环策略 if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) try: # 实例化异步Redis会话管理器 并存储为单实例 app.state.session_manager = RedisSessionManager( Config.REDIS_HOST, Config.REDIS_PORT, Config.REDIS_DB, Config.SESSION_TIMEOUT ) logger.info("Redis初始化成功") # 创建Chat模型 llm_chat, llm_embedding = get_llm(Config.LLM_TYPE) logger.info("Chat模型初始化成功") # 创建数据库连接池 动态连接池根据负载调整连接池大小 async with AsyncConnectionPool( conninfo=Config.DB_URI, min_size=Config.MIN_SIZE, max_size=Config.MAX_SIZE, kwargs={"autocommit": True, "prepare_threshold": 0} ) as pool: # 短期记忆 初始化checkpointer,并初始化表结构 checkpointer = AsyncPostgresSaver(pool) await checkpointer.setup() logger.info("Checkpointer初始化成功") # 获取工具列表 tools = await get_tools() # 创建ReAct Agent 并存储为单实例 app.state.agent = create_react_agent( model=llm_chat, tools=tools, checkpointer=checkpointer ) logger.info("Agent初始化成功") logger.info("服务完成初始化并启动服务") yield except Exception as e: logger.error(f"初始化失败: {str(e)}") raise RuntimeError(f"初始化检查点保存器失败: {str(e)}") # 清理资源 finally: # 关闭Redis连接 await app.state.session_manager.close() # 关闭PostgreSQL连接池 await pool.close() logger.info("关闭服务并完成资源清理") # 实例化app 并使用生命周期上下文管理器进行app初始化 app = FastAPI( title="Agent智能体后端API接口服务", description="基于LangGraph提供AI Agent服务", lifespan=lifespan # 生命周期函数 ) # API接口:创建智能体并调用,直接返回结果或中断数据 @app.post("/agent/invoke", response_model=AgentResponse) async def invoke_agent(request: AgentRequest): logger.info(f"invoke_agent接口,接受到前端用户请求:{request}") # 获取用户请求中的user_id user_id = request.user_id # 判断当前用户会话是否存在 exists = await app.state.session_manager.user_id_exists(user_id) # 若用户不存在 则只在创建新会话时生成新的会话ID if not exists: session_id = str(uuid.uuid4()) status = "idle" last_query = None last_response = None last_updated = time.time() # 创建会话并存储到redis中 await app.state.session_manager.create_session(user_id, session_id, status, last_query, last_response, last_updated) # 若用户存在 则使用现有会话的ID else: session = await app.state.session_manager.get_session(user_id) session_id = session.get("session_id") # 新请求统一更新会话信息 status = "running" last_query = request.query last_response = None last_updated = time.time() await app.state.session_manager.update_session(user_id, status, last_query, last_response, last_updated) # 构造智能体输入消息体 messages = [ {"role": "system", "content": request.system_message}, {"role": "user", "content": request.query} ] try: # 先调用智能体 result = await app.state.agent.ainvoke({"messages": messages}, config={"configurable": {"thread_id": session_id}}) # 将返回的messages进行格式化输出 方便查看调试 await parse_messages(result['messages']) # 再处理结果并更新会话状态 return await process_agent_result(session_id, result, user_id) except Exception as e: # 异常处理 error_response = AgentResponse( session_id=session_id, status="error", message=f"处理请求时出错: {str(e)}" ) logger.error(f"处理请求时出错: {error_response}") # 更新会话状态 status = "error" last_query = None last_response = error_response last_updated = time.time() await app.state.session_manager.update_session(user_id, status, last_query, last_response, last_updated) return error_response # API接口:恢复被中断的智能体执行,等待执行完成或再次中断 @app.post("/agent/resume", response_model=AgentResponse) async def resume_agent(response: InterruptResponse): logger.info(f"resume_agent接口,接受到前端用户请求:{response}") # 获取用户请求中的user_id和session_id user_id = response.user_id client_session_id = response.session_id # 判断当前用户会话是否存在 exists = await app.state.session_manager.user_id_exists(user_id) # 若用户不存在 则抛出异常 if not exists: logger.error(f"status_code=404,用户会话 {user_id} 不存在") raise HTTPException(status_code=404, detail=f"用户会话 {user_id} 不存在") # 然后判断会话ID是否匹配 若不匹配则抛出异常 session = await app.state.session_manager.get_session(user_id) server_session_id = session.get("session_id") if server_session_id != client_session_id: logger.error(f"status_code=400,会话ID不匹配,可能是过期的请求") raise HTTPException(status_code=400, detail="会话ID不匹配,可能是过期的请求") # 检查会话状态是否为中断 若不是中断则抛出异常 session = await app.state.session_manager.get_session(user_id) status = session.get("status") if status != "interrupted": logger.error(f"status_code=400,会话当前状态为 {status},无法恢复非中断状态的会话") raise HTTPException(status_code=400, detail=f"会话当前状态为 {status},无法恢复非中断状态的会话") # 更新会话状态 status = "running" last_query = None last_response = None last_updated = time.time() await app.state.session_manager.update_session(user_id, status, last_query, last_response, last_updated) # 构造响应数据 command_data = { "type": response.response_type } # 如果提供了参数,添加到响应数据中 if response.args: command_data["args"] = response.args try: # 先恢复智能体执行 result = await app.state.agent.ainvoke(Command(resume=command_data), config={"configurable": {"thread_id": server_session_id}}) # 将返回的messages进行格式化输出 方便查看调试 await parse_messages(result['messages']) # 再处理结果并更新会话状态 return await process_agent_result(server_session_id, result, user_id) except Exception as e: # 异常处理 error_response = AgentResponse( session_id=server_session_id, status="error", message=f"恢复执行时出错: {str(e)}" ) logger.error(f"处理请求时出错: {error_response}") # 更新会话状态 status = "error" last_query = None last_response = error_response last_updated = time.time() await app.state.session_manager.update_session(user_id, status, last_query, last_response, last_updated) return error_response # API接口:获取当前用户的状态 @app.get("/agent/status/{user_id}", response_model=SessionStatusResponse) async def get_agent_status(user_id: str): logger.info(f"get_agent_status接口,接受到前端用户请求:{user_id}") # 判断当前用户会话是否存在 exists = await app.state.session_manager.user_id_exists(user_id) # 若用户不存在 构造SessionStatusResponse对象 if not exists: logger.error(f"用户 {user_id} 的会话不存在") return SessionStatusResponse( user_id=user_id, status="not_found", message=f"用户 {user_id} 的会话不存在" ) # 若用户存在 构造SessionStatusResponse对象 session = await app.state.session_manager.get_session(user_id) response = SessionStatusResponse( user_id=user_id, session_id=session.get("session_id"), status=session.get("status"), last_query=session.get("last_query"), last_updated=session.get("last_updated"), last_response=session.get("last_response") ) logger.info(f"返回当前用户的状态:{response}") return response # API接口:获取系统状态信息 @app.get("/system/info", response_model=SystemInfoResponse) async def get_system_info(): logger.info(f"get_system_info接口,接受到前端用户请求") # 构造SystemInfoResponse对象 response = SystemInfoResponse( # 当前系统内会话总数 sessions_count=await app.state.session_manager.get_session_count(), # 系统内当前活跃的用户 active_users=await app.state.session_manager.get_all_user_ids() ) logger.info(f"返回当前系统状态信息:{response}") return response # API接口:删除用户会话 @app.delete("/agent/session/{user_id}") async def delete_agent_session(user_id: str): logger.info(f"delete_agent_session接口,接受到前端用户请求:{user_id}") # 判断当前用户会话是否存在 exists = await app.state.session_manager.user_id_exists(user_id) # 如果不存在 则抛出异常 if not exists: logger.error(f"status_code=404,用户 {user_id} 的会话不存在") raise HTTPException(status_code=404, detail=f"用户会话 {user_id} 不存在") # 如果存在 则删除会话 await app.state.session_manager.delete_session(user_id) response = { "status": "success", "message": f"用户 {user_id} 的会话已删除" } logger.info(f"用户会话已经删除:{response}") return response # 启动服务器 if __name__ == "__main__": if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 使用新的启动方式 async def main(): config = uvicorn.Config( app, host=Config.HOST, port=Config.PORT, log_level="info" ) server = uvicorn.Server(config) await server.serve() asyncio.run(main()) -

-
运行前端服务
python 02_frontendServer.pyimport requests import json import traceback from typing import Dict, Any, Optional import time from rich.console import Console from rich.prompt import Prompt from rich.panel import Panel from rich.markdown import Markdown from rich.theme import Theme from rich.progress import Progress # 创建自定义主题 custom_theme = Theme({ "info": "cyan bold", "warning": "yellow bold", "success": "green bold", "error": "red bold", "heading": "magenta bold underline", "highlight": "blue bold", }) # 初始化Rich控制台 console = Console(theme=custom_theme) # 后端API地址 API_BASE_URL = "http://localhost:8001" # 调用智能体处理查询,并等待完成或中断 def invoke_agent(user_id: str, query: str, system_message: str = "你会使用工具来帮助用户。如果工具使用被拒绝,请提示用户。"): """ 调用智能体处理查询,并等待完成或中断 Args: user_id: 用户唯一标识 query: 用户待查询的问题 system_message: 系统提示词 Returns: 服务端返回的结果 """ # 发送请求到后端API payload = { "user_id": user_id, "query": query, "system_message": system_message } console.print("[info]正在发送请求到智能体,请稍候...[/info]") with Progress() as progress: task = progress.add_task("[cyan]处理中...", total=None) response = requests.post(f"{API_BASE_URL}/agent/invoke", json=payload) progress.update(task, completed=100) if response.status_code == 200: return response.json() else: raise Exception(f"API调用失败: {response.status_code} - {response.text}") # 发送响应以恢复智能体执行 def resume_agent(user_id: str, session_id: str, response_type: str, args: Optional[Dict[str, Any]] = None): """ 发送响应以恢复智能体执行 Args: user_id: 用户唯一标识 session_id: 用户的会话唯一标识 response_type: 响应类型:accept(允许调用), edit(调整工具参数,此时args中携带修改后的调用参数), response(直接反馈信息,此时args中携带修改后的调用参数),reject(不允许调用) args: 如果是edit, response类型,可能需要额外的参数 Returns: 服务端返回的结果 """ payload = { "user_id": user_id, "session_id": session_id, "response_type": response_type, "args": args } console.print("[info]正在恢复智能体执行,请稍候...[/info]") with Progress() as progress: task = progress.add_task("[cyan]恢复执行中...", total=None) response = requests.post(f"{API_BASE_URL}/agent/resume", json=payload) progress.update(task, completed=100) if response.status_code == 200: return response.json() else: raise Exception(f"恢复智能体执行失败: {response.status_code} - {response.text}") # 获取智能体状态 def get_agent_status(user_id: str): """ 获取智能体状态 Args: user_id: 用户唯一标识 Returns: 服务端返回的结果 """ response = requests.get(f"{API_BASE_URL}/agent/status/{user_id}") if response.status_code == 200: return response.json() else: raise Exception(f"获取智能体状态失败: {response.status_code} - {response.text}") # 获取系统信息 def get_system_info(): """ 获取系统信息 Args: Returns: 服务端返回的结果 """ response = requests.get(f"{API_BASE_URL}/system/info") if response.status_code == 200: return response.json() else: raise Exception(f"获取系统信息失败: {response.status_code} - {response.text}") # 删除用户会话 def delete_agent_session(user_id: str): """ 删除用户会话 Args: user_id: 用户唯一标识 Returns: 服务端返回的结果 """ response = requests.delete(f"{API_BASE_URL}/agent/session/{user_id}") if response.status_code == 200: return response.json() elif response.status_code == 404: # 会话不存在也算成功 return {"status": "success", "message": f"用户 {user_id} 的会话不存在"} else: raise Exception(f"删除会话失败: {response.status_code} - {response.text}") # 显示会话的详细信息,包括会话状态、上次查询、响应数据等 def display_session_info(status_response): """ 显示会话的详细信息,包括会话状态、上次查询、响应数据等 参数: status_response: 会话状态响应数据 """ # 基本会话信息面板 user_id = status_response["user_id"] session_id = status_response.get("session_id", "未知") status = status_response["status"] last_query = status_response.get("last_query", "无") last_updated = status_response.get("last_updated") # 构建信息面板内容 panel_content = [ f"用户ID: {user_id}", f"会话ID: {session_id}", f"状态: [bold]{status}[/bold]", f"上次查询: {last_query}" ] # 添加时间戳 if last_updated: time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_updated)) panel_content.append(f"上次更新: {time_str}") # 根据状态设置合适的面板样式 if status == "interrupted": border_style = "yellow" title = "[warning]中断会话[/warning]" elif status == "completed": border_style = "green" title = "[success]完成会话[/success]" elif status == "error": border_style = "red" title = "[error]错误会话[/error]" elif status == "running": border_style = "blue" title = "[info]运行中会话[/info]" elif status == "idle": border_style = "cyan" title = "[info]空闲会话[/info]" else: border_style = "white" title = "[info]未知状态会话[/info]" # 显示基本面板 console.print(Panel( "\n".join(panel_content), title=title, border_style=border_style )) # 显示额外的响应数据 if status_response.get("last_response"): last_response = status_response["last_response"] # 根据会话状态显示不同的响应数据 if status == "completed" and last_response.get("result"): result = last_response["result"] if "messages" in result: final_message = result["messages"][-1] console.print(Panel( Markdown(final_message["content"]), title="[success]上次智能体回答[/success]", border_style="green" )) elif status == "interrupted" and last_response.get("interrupt_data"): interrupt_data = last_response["interrupt_data"] message = interrupt_data.get("description", "需要您的输入") console.print(Panel( message, title=f"[warning]中断消息[/warning]", border_style="yellow" )) elif status == "error": error_msg = last_response.get("message", "未知错误") console.print(Panel( error_msg, title="[error]错误信息[/error]", border_style="red" )) # 检查用户会话状态并尝试恢复 def check_and_restore_session(user_id: str): """ 检查用户会话状态并尝试恢复 参数: user_id: 用户ID 返回: tuple: (是否有活跃会话, 会话状态响应) """ try: # 获取用户会话状态 status_response = get_agent_status(user_id) # 如果没有找到会话 if status_response["status"] == "not_found": console.print("[info]没有找到现有会话,将创建新会话[/info]") return False, None # 显示会话详细信息 console.print(Panel( f"用户ID: {user_id}\n" f"会话ID: {status_response.get('session_id', '未知')}\n" f"状态: [bold]{status_response['status']}[/bold]\n" f"上次查询: {status_response.get('last_query', '无')}\n" f"上次更新: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(status_response['last_updated'])) if status_response.get('last_updated') else '未知'}\n", title="[info]发现现有会话[/info]", border_style="cyan" )) # 显示会话的详细信息 display_session_info(status_response) # 根据会话状态进行自动处理 if status_response["status"] == "interrupted": console.print(Panel( "会话处于中断状态,需要您的响应才能继续。\n" "系统将自动恢复上次的中断点,您需要提供决策。", title="[warning]会话已中断[/warning]", border_style="yellow" )) # 如果有上次的响应且包含中断数据 if (status_response.get("last_response") and status_response["last_response"].get("interrupt_data")): # 显示中断类型和相关信息 interrupt_data = status_response["last_response"]["interrupt_data"] action_request = interrupt_data.get("action_request","未知中断") tool = action_request.get("action","未知工具") args = action_request.get("args","未知参数") console.print(f"[info]相关工具: {tool}[/info]") console.print(f"[info]工具参数: {args}[/info]") # 自动恢复中断处理 console.print("[info]自动恢复中断处理...[/info]") return True, status_response else: console.print("[warning]中断状态会话缺少必要的中断数据,无法恢复[/warning]") console.print("[info]将创建新会话[/info]") return False, None elif status_response["status"] == "completed": console.print(Panel( "会话已完成,上次响应结果可用。\n" "系统将显示上次结果并自动开启新会话。", title="[success]会话已完成[/success]", border_style="green" )) # 显示上次的结果 if (status_response.get("last_response") and status_response["last_response"].get("result")): # 提取并显示结果 last_result = status_response["last_response"]["result"] if "messages" in last_result: final_message = last_result["messages"][-1] console.print(Panel( Markdown(final_message["content"]), title="[success]上次智能体回答[/success]", border_style="green" )) console.print("[info]基于当前会话开始继续...[/info]") return False, None elif status_response["status"] == "error": # 获取错误信息 error_msg = "未知错误" if status_response.get("last_response"): error_msg = status_response["last_response"].get("message", "未知错误") console.print(Panel( f"上次会话发生错误: {error_msg}\n" "系统将自动开始新会话。", title="[error]会话错误[/error]", border_style="red" )) console.print("[info]自动开始新会话...[/info]") return False, None elif status_response["status"] == "running": console.print(Panel( "会话正在运行中,这可能是因为:\n" "1. 另一个客户端正在使用此会话\n" "2. 上一次会话异常终止,状态未更新\n" "系统将自动等待会话状态变化。", title="[warning]会话运行中[/warning]", border_style="yellow" )) # 自动等待会话状态变化 console.print("[info]自动等待会话状态变化...[/info]") with Progress() as progress: task = progress.add_task("[cyan]等待会话完成...", total=None) max_attempts = 30 # 最多等待30秒 attempt_count = 0 for i in range(max_attempts): attempt_count = i # 检查状态 current_status = get_agent_status(user_id) if current_status["status"] != "running": progress.update(task, completed=100) console.print(f"[success]会话状态已更新为: {current_status['status']}[/success]") break time.sleep(1) # 如果等待超时 if attempt_count >= max_attempts - 1: console.print("[warning]等待超时,会话可能仍在运行[/warning]") console.print("[info]为避免冲突,将创建新会话[/info]") return False, None # 获取最新状态(递归调用) return check_and_restore_session(user_id) elif status_response["status"] == "idle": console.print(Panel( "会话处于空闲状态,准备接收新查询。\n" "系统将自动使用现有会话。", title="[info]会话空闲[/info]", border_style="blue" )) # 自动使用现有会话 console.print("[info]自动使用现有会话[/info]") return True, status_response else: # 未知状态 console.print(Panel( f"会话处于未知状态: {status_response['status']}\n" "系统将自动创建新会话以避免潜在问题。", title="[warning]未知状态[/warning]", border_style="yellow" )) console.print("[info]自动创建新会话...[/info]") return False, None except Exception as e: console.print(f"[error]检查会话状态时出错: {str(e)}[/error]") console.print(traceback.format_exc()) console.print("[info]将创建新会话[/info]") return False, None # 处理工具使用审批类型的中断 def handle_tool_interrupt(interrupt_data, user_id, session_id): """ 处理工具使用审批类型的中断 参数: interrupt_data: 中断数据 user_id: 用户ID session_id: 会话ID 返回: 处理后的响应 """ message = interrupt_data.get("description", "需要您的输入") # 显示工具使用审批提示 console.print(Panel( f"{message}", title=f"[warning]智能体需要您的决定[/warning]", border_style="yellow" )) # 获取用户输入 user_input = Prompt.ask("[highlight]您的选择[/highlight]") # 处理用户输入 try: while True: if user_input.lower() == "yes": response = resume_agent(user_id, session_id, "accept") break elif user_input.lower() == "no": response = resume_agent(user_id, session_id, "reject") break elif user_input.lower() == "edit": # 获取新的查询内容 new_query = Prompt.ask("[highlight]请调整新的参数[/highlight]") response = resume_agent(user_id, session_id, "edit", args={"args": json.loads(new_query)}) break elif user_input.lower() == "response": # 获取新的查询内容 new_query = Prompt.ask("[highlight]不调用工具直接反馈信息[/highlight]") response = resume_agent(user_id, session_id, "response", args={"args": new_query}) break else: console.print("[error]无效输入,请输入 'yes'、'no' 、'edit' 或 'response'[/error]") user_input = Prompt.ask("[highlight]您的选择[/highlight]") # 重新获取用户输入(维持当前响应不变) return process_agent_response(response, user_id) except Exception as e: console.print(f"[error]处理响应时出错: {str(e)}[/error]") return None # 处理智能体响应,包括处理中断和显示结果 def process_agent_response(response, user_id): # 防御性检查,确保response不为空 if not response: console.print("[error]收到空响应,无法处理[/error]") return None try: session_id = response["session_id"] status = response["status"] timestamp = response.get("timestamp", time.time()) # 显示时间戳和会话ID(便于调试和跟踪) time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) console.print(f"[info]响应时间: {time_str} | 会话ID: {session_id}[/info]") # 处理不同状态 if status == "interrupted": # 获取中断数据 interrupt_data = response.get("interrupt_data", {}) try: # 进入中断处理函数 return handle_tool_interrupt(interrupt_data, user_id, session_id) except Exception as e: console.print(f"[error]处理中断响应时出错: {str(e)}[/error]") console.print(f"[info]中断状态已保存,您可以稍后恢复会话[/info]") console.print(traceback.format_exc()) return None elif status == "completed": # 显示结果 result = response.get("result", {}) if result and "messages" in result: final_message = result["messages"][-1] console.print(Panel( Markdown(final_message["content"]), title="[success]智能体回答[/success]", border_style="green" )) else: console.print("[warning]智能体没有返回有效的消息[/warning]") if isinstance(result, dict): console.print("[info]原始结果数据结构:[/info]") console.print(result) return result elif status == "error": # 显示错误信息 error_msg = response.get("message", "未知错误") console.print(Panel( f"{error_msg}", title="[error]处理过程中出错[/error]", border_style="red" )) return None elif status == "running": # 处理正在运行状态 console.print("[info]智能体正在处理您的请求,请稍候...[/info]") return response elif status == "idle": # 处理空闲状态 console.print("[info]智能体处于空闲状态,准备接收新的请求[/info]") return response else: # 其他未知状态 console.print(f"[warning]智能体处于未知状态: {status} - {response.get('message', '无消息')}[/warning]") return response except KeyError as e: console.print(f"[error]响应格式错误,缺少关键字段 {e}[/error]") return None except Exception as e: console.print(f"[error]处理智能体响应时出现未预期错误: {str(e)}[/error]") console.print(traceback.format_exc()) return None # 主函数,运行客户端 def main(): console.print(Panel( "前端客户端", title="[heading]智能体交互系统[/heading]", border_style="magenta" )) # 获取系统信息 try: system_info = get_system_info() console.print(f"[info]系统活跃会话: {system_info['sessions_count']}[/info]") if system_info['active_users']: console.print(f"[info]活跃用户: {', '.join(system_info['active_users'])}[/info]") except Exception: console.print("[warning]无法获取系统状态,但这不影响使用[/warning]") # 获取用户ID(在真实应用中可能是登录后的用户标识) default_user_id = f"user_{int(time.time())}" user_id = Prompt.ask("[info]请输入用户ID[/info] (新ID创建新会话,已有ID自动恢复会话)", default=default_user_id) # 检查并尝试自动恢复现有会话 has_active_session, session_status = check_and_restore_session(user_id) # 主交互循环 while True: try: # 会话恢复处理 - 根据状态自动处理 if has_active_session and session_status: # 如果是中断状态,自动处理中断 if session_status["status"] == "interrupted": console.print("[info]自动处理中断的会话...[/info]") if "last_response" in session_status and session_status["last_response"]: # 使用process_agent_response处理之前的中断 result = process_agent_response(session_status["last_response"], user_id) # 重新检查状态 current_status = get_agent_status(user_id) # 如果通过处理中断后完成了会话,自动创建新会话 if current_status["status"] == "completed": # 显示完成消息 console.print("[success]会话已完成[/success]") console.print("[info]自动开始新会话...[/info]") has_active_session = False session_status = None else: # 更新会话状态 has_active_session = True session_status = current_status # 获取用户查询 query = Prompt.ask( "\n[info]请输入您的问题[/info] (输入 'exit' 退出,输入 'status' 查询状态,输入 'new' 开始新会话)", default="你好") # 处理特殊命令 if query.lower() == 'exit': console.print("[info]感谢使用,再见![/info]") break elif query.lower() == 'status': # 查询当前会话状态 status_response = get_agent_status(user_id) console.print(Panel( f"用户ID: {status_response['user_id']}\n" f"会话ID: {status_response.get('session_id', '未知')}\n" f"会话状态: {status_response['status']}\n" f"上次查询: {status_response['last_query'] or '无'}\n" f"上次更新: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(status_response['last_updated'])) if status_response.get('last_updated') else '未知'}\n", title="[info]当前会话状态[/info]", border_style="cyan" )) continue elif query.lower() == 'new': # 主动开始新会话 - 先删除服务端的旧会话 console.print("[info]正在开始新会话...[/info]") try: delete_response = delete_agent_session(user_id) console.print(f"[success]已清理旧会话: {delete_response['message']}[/success]") except Exception as e: console.print(f"[warning]清理旧会话时出错: {str(e)}[/warning]") console.print("[info]将继续创建新会话...[/info]") # 重置本地状态 has_active_session = False session_status = None console.print("[success]新会话已准备就绪![/success]") continue # 调用智能体 console.print("[info]正在提交查询到智能体...[/info]") response = invoke_agent(user_id, query) # 处理智能体响应 result = process_agent_response(response, user_id) # 获取最新状态 latest_status = get_agent_status(user_id) # 根据响应状态自动处理 if latest_status["status"] == "completed": # 处理已完成状态 - 自动开始新会话 console.print("[info]会话已完成,准备接收新的查询[/info]") has_active_session = False session_status = None elif latest_status["status"] == "error": # 处理错误状态 - 自动开始新会话 console.print("[info]会话发生错误,将开始新会话[/info]") has_active_session = False session_status = None else: # 其他状态(idle、interrupted)- 保持会话活跃 has_active_session = True session_status = latest_status except KeyboardInterrupt: console.print("\n[warning]用户中断,正在退出...[/warning]") # 保存当前状态,使会话可以在下次启动时恢复 console.print("[info]会话状态已保存,可以在下次使用相同用户ID恢复[/info]") break except Exception as e: console.print(f"[error]运行过程中出错: {str(e)}[/error]") console.print(traceback.format_exc()) # 尝试自动恢复或创建新会话 has_active_session, session_status = check_and_restore_session(user_id) continue if __name__ == "__main__": main()

使用python实现的一个模拟酒店预订的工具 book_hotel
-
其需传入的参数为:{hotel_name}
使用python实现的一个计算两个数的乘积的工具 multiply
-
其需传入的参数为:{a:float, b:float}
测试流程:
-
输入 'yes' 接受工具调用
调用工具预定如家酒店
-
输入 'no' 拒绝工具调用
调用工具预定桔子酒店
-
输入 'edit' 修改工具参数后调用工具
调用工具预定全季酒店
{"hotel_name": "全季酒店(软件园店)"}
-
输入 'response' 不调用工具直接反馈信息
调用工具预定汉庭酒店
把酒店名称换为:汉庭酒店(软件园店),再调用工具预定

5.3 测试 status、new、exit指令
-
status 查看会话状态,用于客户端故障恢复
-
new 新建会话
-
exit 退出当前会话
5.4 测试客户端和服务端故障恢复
客户端故障恢复:会话管理
-
用户ID(test2):上海天气如何?
-
强制关闭客户端(意外退出)
-
再次启动客户端,输入用户ID(test2),会话自动恢复到中断的状态
服务端故障恢复:LangGraph节点的状态恢复(checkpointer)

5.5 测试动态调整会话的过期时间
5.6 测试历史会话管理和历史会话恢复
5.7 测试异步模式调用Agent服务
-
进入
06_ReActAgentHILApiMultiSessionTask文件夹下运行脚本进行测试,支持多用户访问 -
首先运行
celery -A 01_backendServer.celery_app worker --loglevel=info启动 celery 服务 -
再运行后端服务
python 01_backendServer.py -
最后运行前端服务
python 02_frontendServer.py
6、扩展学习
NirDiamant:https://github.com/NirDiamant

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)