1.langchain是什么?

LangChain is the platform for agent engineering. AI teams at Replit, Clay, Rippling, Cloudflare, Workday, and more trust LangChain’s products to engineer reliable agents.
Agent智能体工程平台。

2.langchain包括哪些?

LangChain 框架 是 乐高积木,你用它们来搭建智能体的基本形态。
LangSmith 是 工作台和质检仪,你在上面调试、测试和改进你的乐高模型。
LangServe 是 包装盒和展示架,你把调试好的乐高模型放上去,供他人使用。
LangGraph 是 马达和传动系统,让你能搭建出会动、能循环、更复杂的自动化乐高机器,而不仅仅是静态模型。

这些产品共同构成了 LangChain 作为“智能体工程平台”的完整能力,从开发、评估到部署和监控,覆盖了智能体应用的整个生命周期。这也正是开头那段简介中提到的“AI团队信任 LangChain 的产品来工程可靠代理”的原因。

3.卖点:

安全:
SOC 2 Type II,GDPR

https://trust.langchain.com/

4.Demo

LangChain Docs MCP server
If you’re using an AI coding assistant or IDE (e.g. Claude Code or Cursor), you should install the LangChain Docs MCP server to get the most out of it. This ensures your agent has access to up-to-date LangChain documentation and examples.

以Qoder为例:配置MCP
在这里插入图片描述

{
  "mcpServers": {
    "docs-langchain": {
      "url": "https://docs.langchain.com/mcp"
    }
  }
}

测试:在这里插入图片描述

4.1 最简单的demo - Build a basic agent

参考:https://docs.langchain.com/oss/python/langchain/quickstart

对接百炼:

https://docs.langchain.com/oss/python/integrations/providers/alibaba_cloud
添加百炼API key到环境变量:

参考: https://help.aliyun.com/zh/model-studio/configure-api-key-through-environment-variables
由于我是MAC PRO电脑,切换bash命令模式:

用您的百炼API Key代替YOUR_DASHSCOPE_API_KEY

echo “export DASHSCOPE_API_KEY=‘YOUR_DASHSCOPE_API_KEY’” >> ~/.bash_profile

安装:
pip install langchain-community
pip install dashscope

from langchain_community.chat_models.tongyi import ChatTongyi from langchain_core.messages import HumanMessage

import warnings from urllib3.exceptions import NotOpenSSLWarning  # 需要导入这个警告类 warnings.filterwarnings("ignore", category=NotOpenSSLWarning) ##忽略SSL告警

chatLLM = ChatTongyi(
    model="qwen-plus",   # 此处以qwen-plus为例,您可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
    streaming=False,
    # other params... ) res = chatLLM.stream([HumanMessage(content="hi,我是你爸爸")], streaming=True) for r in res:
    print("chat resp:", r.content)

回复:
chat resp: 你好
chat resp: !不过
chat resp: 需要澄清一下
chat resp: :我是通
chat resp: 义千问(Qwen
chat resp: ),是阿里巴巴集团旗下的
chat resp: 超大规模语言模型,没有
chat resp: 生物学上的父母或家庭
chat resp: 关系。我是一个AI助手
chat resp: ,很乐意以
chat resp: 尊重、友善和
chat resp: 专业的方式为你提供
chat resp: 帮助 😊

chat resp: 如果你有任何问题、需要学习
chat resp: 建议、创作灵感
chat resp: 、技术解答,或者
chat resp: 只是想聊聊,
chat resp: 我都很乐意陪伴
chat resp: 你~
有什么
chat resp: 我可以帮你的吗
chat resp: ?
chat resp:

更多玩法参考:
https://docs.langchain.com/oss/python/integrations/chat/tongyi

4.2 demo2 - Build a real-world agent

from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage, SystemMessage
#from langchain.messages import HumanMessage, SystemMessage  官网文档为什么是langchain.messages 

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_current_time",
            "description": "当你想知道现在的时间时非常有用。",
            "parameters": {},
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "当你想查询指定城市的天气时非常有用。",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "城市或县区,比如北京市、杭州市、余杭区等。",
                    }
                },
            },
            "required": ["location"],
        },
    },
]

messages = [
    SystemMessage(content="You are a helpful assistant."),
    HumanMessage(content="What is the weather like in San Francisco?"),
]

# 指定模型(关键!)
chatLLM = ChatTongyi(model="qwen-plus")  # 或 qwen-plus, qwen-max 等

llm_kwargs = {"tools": tools, "result_format": "message"}
ai_message = chatLLM.bind(**llm_kwargs).invoke(messages)
print(ai_message)

返回:

content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_52ee38f20e2f4c88a01f0e', 'type': 'function', 'function': {'name': 'get_current_weather', 'arguments': '{"location": "San Francisco"}'}}]} response_metadata={'model_name': 'qwen-plus', 'finish_reason': 'tool_calls', 'request_id': '7c873195-985b-46b0-8a5d-caf6e1382081', 'token_usage': {'input_tokens': 216, 'output_tokens': 21, 'total_tokens': 237, 'prompt_tokens_details': {'cached_tokens': 0}}} id='run-3e5eb936-837b-4e25-b843-9f66a10c7d56-0' tool_calls=[{'name': 'get_current_weather', 'args': {'location': 'San Francisco'}, 'id': 'call_52ee38f20e2f4c88a01f0e', 'type': 'tool_call'}]


在这里插入图片描述

更新代码:调用tool比如外部的天气,位置等。
"""
LangChain Tool Calling Demo - 工具调用示例
===========================================
本示例演示如何使用 LangChain 与通义千问模型进行工具调用(Function Calling)

核心概念:
- Tool(工具):模型可以调用的外部函数,如查询天气、获取时间等
- Tool Calling:模型根据用户输入,决定调用哪个工具以及传入什么参数
- 两次调用流程:
  1. 第一次调用:模型分析用户意图,输出 tool_calls(工具调用请求)
  2. 执行工具函数:根据 tool_calls 调用实际函数,获取结果
  3. 第二次调用:将工具结果传给模型,模型生成最终回复

阿里云免费天气 API 说明:
- 无需申请 API Key,直接调用
- 支持中英文城市名(如 "Beijing" 或 "北京")
- 接口文档:https://market.aliyun.com/products/57096001/cmapi010812.html
"""

from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage
from datetime import datetime
import json
import os
import requests


# ============================================
# 第1部分:定义工具函数(实际执行的业务逻辑)
# ============================================
# 
# 说明:
# - 这些函数是真正执行业务逻辑的代码
# - LLM 只负责决定调用哪个函数、传入什么参数
# - 函数执行结果会返回给 LLM,由 LLM 生成最终回复
# - 你可以在这里添加更多工具函数,比如:查快递、查股票、发送邮件等


def get_current_time():
    """
    获取当前系统时间
    
    用途:当用户询问"现在几点"、"今天几号"等问题时使用
    
    Returns:
        str: 格式化的当前时间字符串,如 "2024-01-15 14:30:25"
    """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


def get_current_weather(location: str):
    """
    获取指定城市的实时天气信息
    
    用途:当用户询问"XX天气怎么样"、"今天需要带伞吗"等问题时使用
    
    Args:
        location (str): 城市名称,支持中英文,如 "北京"、"San Francisco"
    
    Returns:
        dict: 包含天气信息的字典
        
    说明:
        当前使用模拟数据演示,实际使用时请接入真实天气 API
        
        推荐接入的真实 API:
        1. 和风天气 (qweather.com) - 国内稳定,有免费额度
        2. OpenWeatherMap (openweathermap.org) - 国际通用
        3. 高德地图天气 - 需要申请 Key,国内准确
        4. wttr.in - 免费免 Key,但国内网络可能不稳定
    """
    # ============================================
    # 模拟数据 - 用于演示工具调用流程
    # ============================================
    # 实际项目中,请替换为真实的 API 调用代码
    
    # 模拟数据库,包含一些常见城市的天气数据
    mock_weather_db = {
        "san francisco": {
            "city": "San Francisco",
            "temperature": "18°C",
            "feels_like": "16°C",
            "condition": "多云",
            "humidity": "65%",
            "wind_direction": "西南风",
            "wind_speed": "15 km/h",
            "update_time": "刚刚"
        },
        "beijing": {
            "city": "北京",
            "temperature": "22°C",
            "feels_like": "20°C",
            "condition": "晴",
            "humidity": "40%",
            "wind_direction": "北风",
            "wind_speed": "10 km/h",
            "update_time": "刚刚"
        },
        "shanghai": {
            "city": "上海",
            "temperature": "25°C",
            "feels_like": "27°C",
            "condition": "小雨",
            "humidity": "80%",
            "wind_direction": "东南风",
            "wind_speed": "12 km/h",
            "update_time": "刚刚"
        },
        "london": {
            "city": "London",
            "temperature": "12°C",
            "feels_like": "10°C",
            "condition": "阴",
            "humidity": "75%",
            "wind_direction": "西风",
            "wind_speed": "20 km/h",
            "update_time": "刚刚"
        }
    }
    
    # 将输入转换为小写,实现大小写不敏感匹配
    location_key = location.lower().strip()
    
    # 查找匹配的城市数据
    if location_key in mock_weather_db:
        return mock_weather_db[location_key]
    else:
        # 如果没有找到,返回一个通用的模拟数据
        # 实际项目中应该返回错误信息或调用真实 API
        return {
            "city": location,
            "temperature": "20°C",
            "feels_like": "18°C",
            "condition": "晴",
            "humidity": "50%",
            "wind_direction": "微风",
            "wind_speed": "5 km/h",
            "note": "这是模拟数据,实际使用时请接入真实天气 API",
            "update_time": "刚刚"
        }
    
    # ============================================
    # 真实 API 调用示例(和风天气)
    # ============================================
    # 如需接入真实 API,请取消下面的注释并填写你的 API Key
    # 
    # api_key = "你的和风天气 API Key"  # 从 https://dev.qweather.com/ 申请
    # 
    # try:
    #     # 第一步:获取城市编码
    #     geo_url = f"https://geoapi.qweather.com/v2/city/lookup?location={location}&key={api_key}"
    #     geo_response = requests.get(geo_url, timeout=10)
    #     geo_data = geo_response.json()
    #     
    #     if geo_data.get("code") != "200" or not geo_data.get("location"):
    #         return {"error": f"未找到城市: {location}"}
    #     
    #     city_id = geo_data["location"][0]["id"]
    #     city_name = geo_data["location"][0]["name"]
    #     
    #     # 第二步:获取实时天气
    #     weather_url = f"https://devapi.qweather.com/v7/weather/now?location={city_id}&key={api_key}"
    #     weather_response = requests.get(weather_url, timeout=10)
    #     weather_data = weather_response.json()
    #     
    #     if weather_data.get("code") != "200":
    #         return {"error": "获取天气失败"}
    #     
    #     now = weather_data.get("now", {})
    #     return {
    #         "city": city_name,
    #         "temperature": f"{now.get('temp', '未知')}°C",
    #         "condition": now.get('text', '未知'),
    #         "humidity": f"{now.get('humidity', '未知')}%",
    #         "wind_direction": now.get('windDir', '未知'),
    #         "update_time": weather_data.get('updateTime', '未知')
    #     }
    #     
    # except Exception as e:
    #     return {"error": f"请求异常: {str(e)}"}


# ============================================
# 工具函数注册表
# ============================================
# 
# 说明:
# - 这是一个字典,将工具名称映射到实际的函数
# - LLM 返回的 tool_calls 中的 name 字段会在这里查找对应的函数
# - 添加新工具时,需要在这里注册

TOOL_FUNCTIONS = {
    "get_current_time": get_current_time,      # 获取当前时间
    "get_current_weather": get_current_weather, # 获取天气
}

# ============================================
# 第2部分:定义工具描述(给 LLM 看的)
# ============================================
#
# 说明:
# - 这部分是告诉 LLM "你有什么工具可用"
# - LLM 根据 description 决定什么时候调用这个工具
# - parameters 定义了工具需要的参数及其类型
# - 注意:这里的描述必须清晰准确,直接影响 LLM 的决策质量
#
# 格式说明(OpenAI Function Calling 格式):
# - type: "function" 表示这是一个函数工具
# - function.name: 工具名称,必须与 TOOL_FUNCTIONS 中的 key 一致
# - function.description: 工具描述,告诉 LLM 这个工具是做什么的
# - function.parameters: 参数定义,使用 JSON Schema 格式
#   - type: 参数类型,如 "object"、"string"、"number" 等
#   - properties: 具体参数定义
#   - required: 必填参数列表

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_current_time",
            # description 很重要!LLM 根据它判断何时调用此工具
            "description": "获取当前系统时间。当用户询问现在几点、今天几号、当前时间等问题时使用。",
            "parameters": {
                "type": "object",
                "properties": {},  # 此工具不需要参数
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "查询指定城市的实时天气信息。当用户询问某个城市的天气、温度、是否需要带伞等问题时使用。",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        # 参数的描述也很重要,帮助 LLM 理解如何传参
                        "description": "要查询天气的城市名称,可以是中文(如 北京、上海)或英文(如 San Francisco、London)。",
                    }
                },
                # required 表示 LLM 必须提供这个参数,否则调用会失败
                "required": ["location"],
            },
        },
    },
]

# ============================================
# 第3部分:初始化模型和配置
# ============================================
#
# 说明:
# - ChatTongyi 是通义千问的 LangChain 封装
# - model 参数指定使用的模型,可选:qwen-turbo、qwen-plus、qwen-max 等
# - tools 参数告诉模型有哪些工具可用
# - result_format="message" 表示返回消息格式(支持 tool_calls)

# 初始化通义千问模型
chatLLM = ChatTongyi(model="qwen-plus")  # 也可换成 qwen-max(更强)或 qwen-turbo(更快)

# 绑定工具配置
# bind() 方法将工具配置绑定到模型实例,后续调用都会携带这些配置
llm_kwargs = {"tools": tools, "result_format": "message"}


# ============================================
# 第4部分:构建对话消息
# ============================================
#
# 说明:
# - SystemMessage: 系统提示,设定 AI 的角色和行为准则
# - HumanMessage: 用户输入的消息
# - 消息列表会按顺序传递给模型,模型能看到完整的对话历史

messages = [
    SystemMessage(content="You are a helpful assistant. 你会根据用户的提问,判断是否需要调用工具来获取信息。"),
    HumanMessage(content="What is the weather like in San Francisco?"),
]


# ============================================
# 第5部分:第一次调用 - 让模型决定使用什么工具
# ============================================
#
# 说明:
# - 这是工具调用的第一步
# - 模型会分析用户输入,判断是否需要调用工具
# - 如果需要工具,会返回 tool_calls;如果不需要,直接返回 content

print("\n" + "=" * 60)
print("【第1步】第一次调用 LLM,获取工具调用请求")
print("=" * 60)

# 调用模型
ai_message = chatLLM.bind(**llm_kwargs).invoke(messages)

# 打印第一次调用的结果
print(f"\n用户提问: {messages[-1].content}")
print(f"\n模型返回的 content: {ai_message.content}")
print(f"模型返回的 tool_calls: {ai_message.tool_calls}")

# 解释:
# - content: 如果模型直接回答了问题,这里会有内容
# - tool_calls: 如果模型决定调用工具,这里会有工具调用列表
#   每个 tool_call 包含:
#   - name: 要调用的工具名称
#   - args: 传递给工具的参数
#   - id: 工具调用的唯一标识(用于关联 ToolMessage)


# ============================================
# 第6部分:执行工具调用并获取结果
# ============================================
#
# 说明:
# - 这是工具调用的第二步
# - 根据模型的 tool_calls,执行实际的工具函数
# - 将工具执行结果封装成 ToolMessage,添加回消息列表

if ai_message.tool_calls:
    print("\n" + "=" * 60)
    print("【第2步】检测到 tool_calls,开始执行工具函数")
    print("=" * 60)
    
    # 将 AI 的消息(包含 tool_calls)添加到对话历史
    # 这一步很重要!模型需要知道它自己之前请求了哪些工具
    messages.append(ai_message)
    
    # 遍历所有工具调用(可能有多个)
    for tool_call in ai_message.tool_calls:
        tool_name = tool_call["name"]      # 工具名称,如 "get_current_weather"
        tool_args = tool_call["args"]      # 工具参数,如 {"location": "San Francisco"}
        tool_call_id = tool_call["id"]     # 工具调用 ID,用于关联结果
        
        print(f"\n>>> 准备执行工具")
        print(f"    工具名称: {tool_name}")
        print(f"    传入参数: {tool_args}")
        print(f"    调用 ID: {tool_call_id}")
        
        # 根据工具名称,调用对应的实际函数
        if tool_name in TOOL_FUNCTIONS:
            try:
                # 使用 **tool_args 将参数字典解包为关键字参数
                # 例如:{"location": "San Francisco"} → get_current_weather(location="San Francisco")
                result = TOOL_FUNCTIONS[tool_name](**tool_args)
                
                # 将结果转换为 JSON 字符串(如果是字典)
                # ensure_ascii=False 保证中文字符正常显示
                result_str = json.dumps(result, ensure_ascii=False) if isinstance(result, dict) else str(result)
                
            except Exception as e:
                # 捕获工具执行过程中的异常
                result_str = f"工具执行出错: {str(e)}"
        else:
            # 如果模型请求了一个不存在的工具
            result_str = f"错误: 未找到工具 '{tool_name}'"
        
        print(f"<<< 工具返回结果: {result_str}")
        
        # 创建 ToolMessage,将工具结果返回给模型
        # 注意:必须传入 tool_call_id,让模型知道这是哪个工具调用的结果
        tool_message = ToolMessage(content=result_str, tool_call_id=tool_call_id)
        messages.append(tool_message)
        
        print(f"    已添加 ToolMessage 到对话历史")
    
    print(f"\n当前对话历史消息数: {len(messages)}")
    
    # ============================================
    # 第7部分:第二次调用 - 让模型根据工具结果生成最终回复
    # ============================================
    #
    # 说明:
    # - 这是工具调用的第三步
    # - 将工具执行结果传给模型
    # - 模型根据结果生成自然语言的最终回复
    
    print("\n" + "=" * 60)
    print("【第3步】第二次调用 LLM,生成最终回复")
    print("=" * 60)
    
    # 再次调用模型,此时 messages 包含了工具结果
    final_response = chatLLM.bind(**llm_kwargs).invoke(messages)
    
    print(f"\n最终回复: {final_response.content}")
    print("=" * 60)
    
else:
    # 如果模型没有请求工具调用,直接返回 content 作为最终回复
    print("\n" + "=" * 60)
    print("模型未请求工具调用,直接回答:")
    print("=" * 60)
    print(f"\n{ai_message.content}")```

返回:

============================================================
【第1步】第一次调用 LLM,获取工具调用请求
============================================================
模型决定调用: get_current_weather(location='San Francisco')

============================================================
【第2步】检测到 tool_calls,开始执行工具函数
============================================================
工具返回: {"city": "San Francisco", "temperature": "18°C", "condition": "多云", ...}

============================================================
【第3步】第二次调用 LLM,生成最终回复
============================================================
最终回复: The current weather in San Francisco is cloudy with a temperature 
of 18°C (feels like 16°C). The humidity is at 65%, and there's a southwest 
wind blowing at 15 km/h. The information was updated just now.

调用工具的方式有Use with bind_tools和Construct args manually两种。

LangSmith 工匠 可用于 LLM可观测

LangSmith is a framework-agnostic platform for developing, debugging, and deploying AI agents and LLM applications. It helps you trace requests, evaluate outputs, test prompts, and manage deployments in one place. LangSmith works with any agent stack, whether you’re using an existing framework or building your own. Prototype locally, then move to production with integrated monitoring and evaluation to build more reliable AI agents.

在这里插入图片描述
在这里插入图片描述

商业模式 SAAS模型:

按使用量计费(基于 trace 数量)。

在这里插入图片描述

在这里插入图片描述
生成的密码需要好好保存,后续就看不到原来的密钥了。

参考 https://docs.langchain.com/langsmith/home

生态整个生态的包:
pip install --upgrade langchain langchain-community langchain-core langsmith langchain-text-splitters

示例:

export LANGSMITH_TRACING=true
export LANGSMITH_API_KEY=<your-api-key>
# This example uses OpenAI, but you can use any LLM provider of choice
export OPENAI_API_KEY=<your-openai-api-key>
# For LangSmith API keys linked to multiple workspaces, set the LANGSMITH_WORKSPACE_ID environment variable to specify which workspace to use.
export LANGSMITH_WORKSPACE_ID=<your-workspace-id>

echo “export LANGSMITH_API_KEY=‘LANGSMITH_API_KEY’” >> ~/.bash_profile

代码:

from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage
import os ##因为加入了os.environ需要导入

print("LANGSMITH_API_KEY:", os.getenv("LANGSMITH_API_KEY"))
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "langchain_test_project"  # 您的项目名称

chatLLM = ChatTongyi(
    model="qwen-plus",   # 此处以qwen-plus为例,您可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
    streaming=False,
    # other params...
)
res = chatLLM.stream([HumanMessage(content="hi,分析下阿里股价")], streaming=False)
for r in res:
    print("chat resp:", r.content)

测试

nizhen@Ethan-s-Super-Mac AI_Coding % /usr/bin/python3 /Users/nizhen/Desktop/AI_Coding/langchain_de
mo_w_langsmith.py
/Users/nizhen/Library/Python/3.9/lib/python/site-packages/urllib3/__init__.py:35: NotOpenSSLWarning: urllib3 v2 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'LibreSSL 2.8.3'. See: https://github.com/urllib3/urllib3/issues/3020
  warnings.warn(
/Users/nizhen/Library/Python/3.9/lib/python/site-packages/langsmith/client.py:290: LangSmithMissingAPIKeyWarning: API key must be provided when using hosted LangSmith API
  warnings.warn(
chat resp: 你好
chat resp: !不过
chat resp: 需要澄清
chat resp: 一下:我是
chat resp: 通义千问(Q
chat resp: wen),是阿里巴巴
chat resp: 研发的超大规模
chat resp: 语言模型,没有
chat resp: 生物学意义上的家庭关系
chat resp: 或亲属身份。我
chat resp: 尊重每一位用户,也
chat resp: 乐意以友好、平等
chat resp: 、互助的态度和
chat resp: 你交流 😊
chat resp: 有任何问题、需要帮助、
chat resp: 想探讨某个话题,
chat resp: 或者只是想轻松
chat resp: 聊聊天——我
chat resp: 都很乐意陪你!✨
chat resp:   
chat resp: 有什么想聊的
chat resp: 呢?
chat resp: 

如果os.environ 设置不对比如URL,LANGSMITH_API_KEY等不对,会展示:

nizhen@Ethan-s-Super-Mac AI_Coding % /usr/bin/python3 /Users/nizhen/Desktop/AI_Coding/langchain_de
mo_w_langsmith.py
/Users/nizhen/Library/Python/3.9/lib/python/site-packages/urllib3/__init__.py:35: NotOpenSSLWarning: urllib3 v2 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'LibreSSL 2.8.3'. See: https://github.com/urllib3/urllib3/issues/3020
  warnings.warn(
/Users/nizhen/Library/Python/3.9/lib/python/site-packages/langsmith/client.py:290: LangSmithMissingAPIKeyWarning: API key must be provided when using hosted LangSmith API
  warnings.warn(
chat resp: 你好
chat resp: !不过
chat resp: 需要澄清
chat resp: 一下:我是
chat resp: 通义千问(Q
chat resp: wen),是阿里巴巴
chat resp: 研发的超大规模
chat resp: 语言模型,没有
chat resp: 生物学意义上的家庭关系
Failed to multipart ingest runs: langsmith.utils.LangSmithAuthError: Authentication failed for https://api.smith.langchain.com/runs/multipart. HTTPError('401 Client Error: Unauthorized for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Unauthorized"}\n')trace=019cd7c5-7617-7700-aad0-908a8085a144,id=019cd7c5-7617-7700-aad0-908a8085a144
chat resp: 或亲属身份。我
chat resp: 尊重每一位用户,也
chat resp: 乐意以友好、平等
chat resp: 、互助的态度和
chat resp: 你交流 😊

如果你
chat resp: 有任何问题、需要帮助、
chat resp: 想探讨某个话题,
chat resp: 或者只是想轻松
chat resp: 聊聊天——我
chat resp: 都很乐意陪你!✨
chat resp:   
那么,今天
chat resp: 有什么想聊的
chat resp: 呢?
chat resp: 
Failed to send compressed multipart ingest: langsmith.utils.LangSmithAuthError: Authentication failed for https://api.smith.langchain.com/runs/multipart. HTTPError('401 Client Error: Unauthorized for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Unauthorized"}\n')trace=019cd7c5-7617-7700-aad0-908a8085a144,id=019cd7c5-7617-7700-aad0-908a8085a144

Tracing:
在这里插入图片描述
监控在这里插入图片描述

LangGraph – agent orchestration 智能体编排 ,松耦合,不用Langchain也可以用。

LangSmith的home界面有LangGraph界面

Debug and iterate in Studio

LangGraph is a low-level orchestration framework and runtime for building, managing, and deploying long-running, stateful agents.
LangGraph is very low-level, and focused entirely on agent orchestration

“它不是要替代 LangChain,而是专门解决 Agent 在真实业务场景中的编排调度问题——如何让 AI Agent 像可靠的"数字员工"一样,支持长时间、可中断、可协作的复杂任务执行”

什么时候需要langGraph:

在这里插入图片描述

LangGraph demo:

langchain_demo_w_langsmith_w_langgraph.py
├── LangSmith 配置
├── 通义千问模型初始化
├── LangGraph 节点定义
│ ├── call_model: 调用 LLM
│ └── should_continue: 条件判断
├── 工作流构建
│ ├── build_graph(): 基础工作流
│ └── build_graph_with_tools(): 带工具调用的复杂工作流
└── 主程序
├── main(): 基础示例
└── demo_with_tools(): 工具调用示例

"""
LangChain + LangSmith + LangGraph 集成示例
==========================================
本示例展示如何将 LangGraph 的工作流集成到 LangChain 应用中,
同时使用 LangSmith 进行调用追踪和监控。

主要组件:
- LangChain: 大模型调用和工具集成
- LangSmith: 调用链路追踪和监控
- LangGraph: 构建复杂的 agent 工作流
"""

import os
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from typing import Literal

# ============================================
# 配置 LangSmith 追踪
# ============================================
# 确保环境变量中已设置 LANGSMITH_API_KEY
# export LANGSMITH_API_KEY="your-api-key"

os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "langchain_langgraph_demo"

print("LANGSMITH_API_KEY:", os.getenv("LANGSMITH_API_KEY", "未设置"))


# ============================================
# 初始化通义千问模型
# ============================================
chatLLM = ChatTongyi(
    model="qwen-plus",
    streaming=False,
)


# ============================================
# 定义 LangGraph 节点函数
# ============================================

def call_model(state: MessagesState):
    """
    LangGraph 节点:调用 LLM 生成回复
    
    Args:
        state: 包含消息历史的状态对象
        
    Returns:
        包含 AI 回复的消息更新
    """
    # 从状态中获取消息历史
    messages = state["messages"]
    
    # 调用通义千问模型
    response = chatLLM.invoke(messages)
    
    # 返回更新后的消息列表
    return {"messages": [response]}


def should_continue(state: MessagesState) -> Literal["continue", "end"]:
    """
    LangGraph 条件边:决定工作流是否继续
    
    可以根据实际需求添加逻辑,例如:
    - 检查是否需要调用工具
    - 检查对话轮数是否超过限制
    - 检查是否达到终止条件
    
    Args:
        state: 当前状态
        
    Returns:
        "continue" 继续执行 或 "end" 结束工作流
    """
    messages = state["messages"]
    last_message = messages[-1]
    
    # 示例:如果最后一条消息包含特定关键词,结束对话
    if isinstance(last_message, AIMessage):
        content = last_message.content.lower()
        if "再见" in content or "goodbye" in content:
            return "end"
    
    # 默认继续(这里简化为直接结束,实际可扩展)
    return "end"


# ============================================
# 构建 LangGraph 工作流
# ============================================

def build_graph():
    """
    构建 LangGraph 工作流
    
    工作流结构:
    START -> call_model -> END
    
    可扩展为更复杂的结构,例如:
    START -> call_model -> [工具调用] -> call_model -> END
    """
    # 创建状态图
    workflow = StateGraph(MessagesState)
    
    # 添加节点
    workflow.add_node("call_model", call_model)
    
    # 添加边
    workflow.add_edge(START, "call_model")
    
    # 添加条件边(可选)
    # workflow.add_conditional_edges(
    #     "call_model",
    #     should_continue,
    #     {
    #         "continue": "call_model",  # 继续调用模型
    #         "end": END                 # 结束工作流
    #     }
    # )
    
    # 直接连接到结束
    workflow.add_edge("call_model", END)
    
    # 编译图
    return workflow.compile()


# ============================================
# 主程序:运行集成示例
# ============================================

def main():
    """主函数:演示 LangChain + LangSmith + LangGraph 集成"""
    
    print("\n" + "=" * 60)
    print("LangChain + LangSmith + LangGraph 集成示例")
    print("=" * 60)
    
    # 构建工作流
    graph = build_graph()
    
    # 准备输入消息
    messages = [
        SystemMessage(content="You are a helpful assistant. 请用中文回答。"),
        HumanMessage(content="你好,分析下阿里的股价")
    ]
    
    print(f"\n用户输入: {messages[-1].content}")
    print("\n正在调用 LangGraph 工作流...")
    
    # 执行工作流
    # LangSmith 会自动追踪这次调用
    result = graph.invoke({"messages": messages})
    
    # 获取最终回复
    final_messages = result["messages"]
    ai_response = final_messages[-1].content
    
    print(f"\nAI 回复:\n{ai_response}")
    print("\n" + "=" * 60)
    print("调用完成!请在 LangSmith 控制台查看追踪详情")
    print("=" * 60)


# ============================================
# 进阶:带工具调用的 LangGraph 工作流
# ============================================

def build_graph_with_tools():
    """
    构建支持工具调用的复杂工作流
    
    工作流结构:
    START -> call_model -> [判断是否需要工具] 
          -> [需要] -> call_tool -> call_model -> END
          -> [不需要] -> END
    """
    from datetime import datetime
    
    # 定义工具函数
    def get_current_time():
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # 工具注册
    tools = {
        "get_current_time": get_current_time
    }
    
    def call_model_with_tools(state: MessagesState):
        """带工具调用的模型节点"""
        messages = state["messages"]
        
        # 绑定工具到模型
        llm_with_tools = chatLLM.bind(
            tools=[
                {
                    "type": "function",
                    "function": {
                        "name": "get_current_time",
                        "description": "获取当前时间",
                        "parameters": {}
                    }
                }
            ]
        )
        
        response = llm_with_tools.invoke(messages)
        return {"messages": [response]}
    
    def call_tool(state: MessagesState):
        """工具执行节点"""
        messages = state["messages"]
        last_message = messages[-1]
        
        # 检查是否有工具调用请求
        if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
            tool_call = last_message.tool_calls[0]
            tool_name = tool_call["name"]
            
            if tool_name in tools:
                result = tools[tool_name]()
                return {"messages": [{"role": "tool", "content": result, "tool_call_id": tool_call["id"]}]}
        
        return {"messages": []}
    
    # 构建图
    workflow = StateGraph(MessagesState)
    workflow.add_node("call_model", call_model_with_tools)
    workflow.add_node("call_tool", call_tool)
    
    workflow.add_edge(START, "call_model")
    
    # 条件边:判断是否需要调用工具
    def route_tools(state: MessagesState):
        messages = state["messages"]
        last_message = messages[-1]
        if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
            return "call_tool"
        return END
    
    workflow.add_conditional_edges("call_model", route_tools, {
        "call_tool": "call_tool",
        END: END
    })
    workflow.add_edge("call_tool", "call_model")
    
    return workflow.compile()


def demo_with_tools():
    """演示带工具调用的工作流"""
    print("\n" + "=" * 60)
    print("带工具调用的 LangGraph 示例")
    print("=" * 60)
    
    graph = build_graph_with_tools()
    
    messages = [
        SystemMessage(content="You are a helpful assistant."),
        HumanMessage(content="现在几点了?")
    ]
    
    print(f"\n用户输入: {messages[-1].content}")
    
    result = graph.invoke({"messages": messages})
    
    final_response = result["messages"][-1].content
    print(f"\nAI 回复: {final_response}")
    print("=" * 60)


if __name__ == "__main__":
    # 运行基础示例
    main()
    
    # 运行带工具调用的示例(取消注释)
    # demo_with_tools()

可观测以及返回

可观测节点多了哥LangGraph
在这里插入图片描述

参考:
https://docs.langchain.com/oss/python/langgraph/overview

利用Qoder Quest模式更新文件:
在这里插入图片描述

a.会创建SPEC需求文档(可能是claude或者gpt的思考,值得研读):

创建一个新文件 buffett_stock_analyzer.py,基于 LangGraph 构建严格遵循巴菲特/芒格价值投资哲学的股票分析 Agent。包含 6 个工作流节点(解析输入→yfinance数据获取→10维基本面评分→护城河评估→DCF估值与安全边际→巴菲特风格报告),支持美股/港股/A股,交互式 CLI 对话。

深度思考 · 10s
好的,需求已明确。让我设计具体的实现方案。
Spec已创建
Design Buffett agent plan
深度思考 · 1s
让我先深入了解现有的代码库,然后为你设计详细的实现方案。
已查看
langchain_demo_w_langsmith_w_langgraph_v2.py
深度思考 · 1s
Let me also check the earlier v1 and other related files for additional patterns:
已查看
langchain_demo_w_langsmith_w_langgraph.py
已查看
langchain_demo_w_langsmith.py
已执行命令
pip list 2>/dev/null | grep -iE "langchain|langgraph|langsmith|yfinance|tongyi"
已执行命令
pip3 list 2>/dev/null | grep -iE "langchain|langgraph|langsmith|yfinance|tongyi|dashscope"
已执行命令
python3 -c "import yfinance; print(yfinance.__version__)" 2>&1
深度思考 · 4s
已执行命令
python3 -c " import yfinance as yf # Quick check what data is available t = yf.Ticker('AAPL') info = t.info # Print available keys print('Available info keys:', sorted(info.keys())[:50]) print('---') print('Total keys:', len(info.keys())) " 2>&1 | head -20
已执行命令
python3 -c " import yfinance as yf # Check the Ticker object methods/attributes t = yf.Ticker('AAPL') methods = [m for m in dir(t) if not m.startswith('_')] print('Ticker methods/attributes:') for m in methods: print(f' - {m}') " 2>&1
已执行命令
python3 -c " import yfinance as yf # Check fast_info which doesn't require full API call t = yf.Ticker('AAPL') fi = t.fast_info print('fast_info keys:') for k in dir(fi): if not k.startswith('_'): print(f' {k}') " 2>&1
已执行命令
python3 -c " from langgraph.graph import StateGraph # Check LangGraph version capabilities import langgraph print('LangGraph version:', langgraph.__version__) " 2>&1
已执行命令
python3 --version 2>&1
现在我已经充分了解了现有代码库、技术栈和 yfinance 的数据能力。下面给出详细的实现规格设计。
巴菲特/芒格价值投资股票分析 Agent — 实现规格设计
一、文件结构与架构
新文件: /Users/nizhen/Desktop/AI_Coding/buffett_stock_analyzer_v3.py单文件架构(与现有 v2 保持一致风格),内部按逻辑区块组织:
plaintext
文件内部逻辑结构:
┌─────────────────────────────────────────┐
│ 1. 模块导入与环境配置                      │
│ 2. LLM 初始化 (ChatTongyi)              │
│ 3. 常量定义 (股票名称映射表等)               │
│ 4. State 定义 (BuffettAnalysisState)     │
│ 5. yfinance 数据获取工具函数                │
│ 6. 巴菲特评分系统                          │
│ 7. LangGraph 节点函数 (6个节点)           │
│ 8. Graph 构建函数                         │
│ 9. CLI 交互主循环                         │
│ 10. __main__ 入口                        │
└─────────────────────────────────────────┘
设计决策:维持单文件方案与用户现有代码风格一致,降低入门门槛。如果未来扩展,可拆分为 buffett_analyzer/ 包。
二、State 定义 (TypedDict)
plaintext
BuffettAnalysisState(TypedDict):
    messages          : Annotated[list, add_messages]  # 对话历史(自动累加)
    user_query        : str           # 用户原始输入
    stock_ticker      : str           # 解析后的股票代码 (如 "600519.SS")
    stock_name        : str           # 股票中文名 (如 "贵州茅台")
    market            : str           # 市场标识: "US" / "HK" / "A_SHARE"
    raw_data          : dict          # yfinance 获取的原始数据(全部财务数据的汇总字典)
    fundamental_scores: dict          # 基本面评分 (每个维度的得分和说明)
    moat_assessment   : str           # 经济护城河评估文本
    valuation_result  : dict          # 估值分析结果 (内在价值、安全边际等)
    final_report      : str           # 最终巴菲特风格报告
    error             : str           # 错误信息(若有)
关键设计:raw_data 用一个大字典汇总所有 yfinance 获取的数据(info、财务报表、现金流等),这样后续的多个分析节点可以各取所需,无需重复调用 API。
三、LangGraph 工作流(6 节点 + 条件路由)
plaintext
                         ┌──────────────┐
                         │    START     │
                         └──────┬───────┘
                                │
                         ┌──────▼───────┐
                         │  parse_input │  ← LLM 解析自然语言 → 股票代码
                         └──────┬───────┘
                                │
                       ┌────────▼────────┐
                       │  has_error ?    │ ← 条件路由
                       └───┬─────────┬───┘
                      有错误│         │正常
                           │    ┌────▼────────┐
                           │    │ fetch_data   │ ← yfinance 获取全部财务数据
                           │    └────┬────────┘
                           │         │
                           │    ┌────▼────────┐
                           │    │fundamental   │ ← 巴菲特评分 + LLM 分析基本面
                           │    │_analysis     │
                           │    └────┬────────┘
                           │         │
                           │    ┌────▼────────┐
                           │    │  moat_       │ ← LLM 评估经济护城河
                           │    │  assessment  │
                           │    └────┬────────┘
                           │         │
                           │    ┌────▼────────┐
                           │    │ valuation_   │ ← DCF 简易计算 + 安全边际
                           │    │ analysis     │
                           │    └────┬────────┘
                           │         │
                           │    ┌────▼────────┐
                           │    │ generate_    │ ← LLM 生成巴菲特风格报告
                           │    │ report       │
                           │    └────┬────────┘
                           │         │
                       ┌───▼─────────▼───┐
                       │      END        │
                       └─────────────────┘
条件路由说明
parse_input 完成后检查 state["error"] 是否为空
如果有错误(无法识别股票代码、yfinance 不支持等),跳过所有分析节点直接到 generate_report(此节点在有错误时输出友好的错误提示)
fetch_data 同样设置错误条件路由:如果 yfinance 数据获取失败,跳到 generate_report 输出错误信息
四、各节点详细职责
节点 1: parse_input — 智能股票代码解析
职责:将用户自然语言输入转换为 yfinance 可识别的 ticker 代码处理逻辑:
先走本地快速映射表(避免不必要的 LLM 调用),覆盖约 50-100 只热门股:
中文名映射:"贵州茅台""600519.SS""腾讯""0700.HK""苹果""AAPL"
代码直接映射:"600519""600519.SS""00700""0700.HK"
拼音/简称映射:"茅台""600519.SS""阿里""BABA"
如果本地映射未命中,调用 LLM 解析:
System Prompt 要求 LLM 返回 JSON 格式 {"ticker": "xxx", "name": "xxx", "market": "US/HK/A_SHARE"}
LLM 具备知识可以识别绝大多数中美港股票名称
后缀规则自动补全:
6 位纯数字、以 6 开头 → .SS(上海)
6 位纯数字、以 0/3 开头 → .SZ(深圳)
4-5 位纯数字 → .HK(港股,需前补零到4位)
纯英文字母 → 美股,无后缀
输出:更新 stock_ticker、stock_name、market;出错则设 error
节点 2: fetch_data — yfinance 全量数据获取
职责:一次性获取目标股票的所有财务数据,打包为 raw_data 字典具体获取内容:
数据类别	yfinance 调用	存入 raw_data 的 key
基础信息	ticker.info	info — 包含 PE、PB、ROE、市值、行业等
快速信息	ticker.fast_info	fast_info — 当前价格、市值
年度利润表	ticker.income_stmt	income_stmt — 近4年收入、净利润
年度资产负债表	ticker.balance_sheet	balance_sheet — 资产、负债、股东权益
年度现金流量表	ticker.cash_flow	cash_flow — 经营/投资/融资现金流
季度利润表	ticker.quarterly_income_stmt	quarterly_income_stmt
分红历史	ticker.dividends	dividends — 历年分红记录
历史价格	ticker.history(period="5y")	price_history — 5年价格走势
主要持有人	ticker.major_holders	major_holders — 内部人持股
机构持有人	ticker.institutional_holders	institutional_holders
分析师目标价	ticker.analyst_price_targets	analyst_targets
容错处理:
每个调用用 try/except 包裹,失败的数据项设为 None 而非中断整个流程
DataFrame 转为 dict 存储(JSON 可序列化)
对 A 股和港股做特殊处理:部分字段可能缺失(如 A 股通常无 analyst_price_targets)
节点 3: fundamental_analysis — 巴菲特基本面评分
职责:基于 yfinance 数据,用量化评分 + LLM 定性分析双轨制评估纯 Python 量化评分(不调 LLM,速度快且确定性强):评分体系 — 总分 100 分,10 个维度各 10 分:
维度	权重	满分条件	数据来源
① ROE 稳定性	10分	近4年 ROE 均 >15%,且波动小	income_stmt + balance_sheet 计算
② 盈利增长	10分	净利润年均增长率 >10%	income_stmt
③ 利润率	10分	净利润率 >20%,稳定或上升	income_stmt
④ 资产负债率	10分	总负债/总资产 <50%(金融业特殊处理)	balance_sheet
⑤ 流动比率	10分	流动资产/流动负债 >1.5	balance_sheet
⑥ 自由现金流	10分	FCF 为正且持续增长	cash_flow
⑦ PE 估值	10分	PE <15 满分,15-25 逐步降分,>40 零分	info
⑧ PB 估值	10分	PB <1.5 满分,1.5-3 逐步降分	info
⑨ 分红记录	10分	连续 5 年分红,且分红率合理	dividends
⑩ 内部人持股	10分	内部人持股 >5% 得分较高	major_holders
每个维度的评分函数独立实现,返回 (score: float, explanation: str) 元组。然后将量化得分 + 原始财务数据一起传给 LLM 做定性分析,LLM 主要负责:
解释各指标得分背后的业务含义
识别异常值和风险信号
用巴菲特的语言风格总结
节点 4: moat_assessment — 经济护城河评估
职责:专门评估公司的竞争优势(这是巴菲特最看重的)LLM 驱动(传入 raw_data 中的关键指标):System Prompt 要求 LLM 从 5 个维度评估护城河:
品牌优势 — 品牌溢价能力(参考利润率稳定性)
转换成本 — 客户切换到竞争对手的代价
网络效应 — 用户越多价值越大
成本优势 — 规模效应、独特资源
准入壁垒 — 牌照、专利、监管门槛
LLM 对每个维度给出 强///无 评级 + 理由,最终给出综合护城河评级:宽护城河 / 窄护城河 / 无护城河传给 LLM 的数据点:
公司名称、行业、市值
近 4 年毛利率和净利率趋势
ROE 趋势
营收增长率
市场地位(市值排名隐含信息)
节点 5: valuation_analysis — 估值与安全边际
职责:计算内在价值,与当前市价比较得出安全边际纯 Python 计算(确定性,不依赖 LLM):方法 A:简化 DCF(所有者收益法)
Owner Earnings = 净利润 + 折旧 - 资本支出(从 cash_flow 获取)
增长率假设:取近 3 年净利润增长率的中位数,但 cap 在 5%-15% 区间(保守原则)
折现率:固定 10%(巴菲特常用)
终值增长率:3%
预测期:10 年
计算公式:内在价值 = Σ(第i年所有者收益 / 1.1^i) + 终值 / 1.1^10
每股内在价值 = 内在价值 / 总股本
方法 B:Graham 公式(作为交叉验证)
V = EPS × (8.5 + 2g) × 4.4 / Y
EPS:近 12 个月每股收益
g:预期 5 年增长率
Y:当前 AAA 债券收益率(固定 4.5% 作为默认)
安全边际计算:
margin_of_safety = (intrinsic_value - current_price) / intrinsic_value
巴菲特要求 ≥25%,越高越好
分级:>50% 非常吸引 / 25-50% 有吸引力 / 0-25% 估值合理 / <0% 高估
输出到 valuation_result 字典:
plaintext
{
    "current_price": float,
    "dcf_intrinsic_value": float,
    "graham_value": float,
    "margin_of_safety_dcf": float,  # 百分比
    "margin_of_safety_graham": float,
    "valuation_verdict": "低估/合理/高估/严重高估"
}
节点 6: generate_report — 巴菲特风格报告生成
职责:汇总所有分析结果,由 LLM 生成最终的巴菲特/芒格风格投资报告LLM 输入:全部前序节点的结果 — fundamental_scores、moat_assessment、valuation_result、raw_data 中的关键指标输出格式(在 System Prompt 中严格规定):
plaintext
══════════════════════════════════════════════
  🏛️ 巴菲特价值投资分析报告
  📊 [公司名称] ([股票代码]) | [市场]
══════════════════════════════════════════════

一、公司概况
   [简要描述公司业务、行业地位]

二、巴菲特评分卡 [XX/100]
   ✅ ROE 稳定性:     [X/10][简评]
   ✅ 盈利增长:       [X/10][简评]
   ... (10)

三、经济护城河: [//]
   [护城河分析要点]

四、估值分析
   当前价格: ¥/$ XXX
   DCF 内在价值: ¥/$ XXX
   安全边际: XX%
   [估值判断]

五、巴菲特会怎么说?
   [用巴菲特/芒格的口吻和名言给出最终建议]
   [引用 1-2 句巴菲特/芒格经典语录]

六、风险提示
   [关键风险因素]

⚠️ 免责声明:本分析仅供学习参考,不构成投资建议。
══════════════════════════════════════════════
错误场景处理:如果 state["error"] 非空,此节点输出友好的错误信息(如"抱歉,无法获取该股票数据,请检查股票代码是否正确")
五、LLM System Prompt 设计
整个 Agent 需要 3 个不同的 System Prompt:
Prompt A: 股票代码解析 (parse_input 节点)
你是一个股票代码解析助手。用户会用自然语言提到一只股票,你需要识别出对应的 yfinance 股票代码。规则:
美股:直接返回代码(如 AAPL, MSFT, BABA)
港股:数字代码加 .HK 后缀(如 0700.HK, 9988.HK)
A 股上海:6 位代码加 .SS(如 600519.SS)
A 股深圳:6 位代码加 .SZ(如 000858.SZ, 300750.SZ)
必须以严格 JSON 格式回复:{"ticker": "代码", "name": "公司中文名", "market": "US/HK/A_SHARE"}
如果无法识别,返回:{"ticker": "", "name": "", "market": "", "error": "无法识别的股票"}
Prompt B: 护城河评估 (moat_assessment 节点)
你是查理·芒格,以跨学科思维模型闻名的投资大师。请基于以下财务数据,评估这家公司的经济护城河。从以下五个维度分析护城河:
品牌价值 — 公司品牌是否有定价权?
转换成本 — 客户是否难以切换到竞争对手?
网络效应 — 使用的人越多,产品是否越有价值?
成本优势 — 公司是否有结构性的成本优势?
准入壁垒 — 新竞争者是否难以进入?
每个维度给出「强///无」评级并说明理由。
最终得出综合判断:宽护城河 / 窄护城河 / 无护城河
记住芒格的忠告:"只投资你完全理解的企业。" 如果数据不足以判断,坦诚说明。
Prompt C: 最终报告生成 (generate_report 节点)
你是一位严格遵循巴菲特和芒格价值投资哲学的投资分析师。请基于以下分析数据,撰写一份巴菲特风格的投资分析报告。核心原则(必须在报告中体现):
内在价值:股价只是价格,价值才是本质
安全边际:只在价格远低于内在价值时才值得投资
经济护城河:寻找有持久竞争优势的企业
长期视角:以 10 年为周期思考投资
能力圈:只投资能理解的企业
管理层:好企业需要好管理层
语气要求:
用巴菲特式的朴素智慧和幽默感
适当引用巴菲特/芒格的经典名言
保守且诚实,不确定的地方坦诚告知
永远提醒"投资有风险"
如果分析结果显示这是一只投机性强的股票,请明确警告,引用巴菲特名言"如果你不愿意持有一只股票十年,那就不要持有它十分钟。"
六、巴菲特评分系统详细设计
评分函数的设计策略是保守倾向(这符合巴菲特风格):
① ROE 稳定性 (10)
获取近 4 年净利润和股东权益,计算每年 ROE
评分逻辑:
4 年均 >20%: 104 年均 >15%: 83>15%: 6 分
均值 >10%: 4 分
均值 >5%: 2 分
其他: 0 分
附加:如果波动超过 5 个百分点扣 2 分
② 盈利增长 (10)
计算净利润年均复合增长率 (CAGR)
CAGR >20%: 10, >15%: 8, >10%: 6, >5%: 4, >0%: 2, <0%: 0
③ 净利润率 (10)
取最新年度净利润率
>30%: 10, >20%: 8, >15%: 6, >10%: 4, >5%: 2, <5%: 0
④ 资产负债率 (10)
总负债 / 总资产
<20%: 10, <30%: 8, <40%: 6, <50%: 4, <60%: 2,60%: 0
金融行业特殊处理:银行、保险等负债率天然高,降低标准
⑤ 流动比率 (10)
流动资产 / 流动负债
>2.0: 10, >1.5: 8, >1.2: 6, >1.0: 4, >0.8: 2, <0.8: 0
⑥ 自由现金流 (10)
FCF = 经营现金流 - 资本支出
4 年均为正且增长: 10 分,逐步降分
⑦ PE 估值 (10)
PE <10: 10, <15: 8, <20: 6, <25: 4, <35: 2,35 或负: 0
⑧ PB 估值 (10)
PB <1: 10, <1.5: 8, <2: 6, <3: 4, <5: 2,5: 0
⑨ 分红记录 (10)
连续 5 年以上分红: 104: 83: 6,以此类推
近 1 年无分红: 0
⑩ 内部人持股 (10)
内部人持有 >20%: 10, >10%: 8, >5%: 6, >1%: 4, <1%: 2
数据不可用: 5(中间值,不惩罚也不奖励)
总分解读
80-100 分:🟢 巴菲特可能感兴趣的优质企业
60-79 分:🟡 有价值但需关注薄弱环节
40-59 分:🟠 不符合巴菲特标准的关键维度较多
0-39 分:🔴 巴菲特不会考虑,请警惕
七、yfinance 数据获取与结构化
fetch_data 节点中的核心工具函数设计:
主函数签名
def fetch_comprehensive_data(ticker_symbol: str) -> dict
数据获取策略(分层容错)
第一层(必需数据 — 获取失败则报错):
ticker.fast_info → 当前价格、市值
ticker.info → PE、PB、行业等基础指标
第二层(重要数据 — 获取失败降级处理):
ticker.income_stmt → 年度利润表
ticker.balance_sheet → 资产负债表
ticker.cash_flow → 现金流量表
第三层(增强数据 — 获取失败忽略):
ticker.dividends → 分红历史
ticker.major_holders → 大股东
ticker.institutional_holders → 机构持有
ticker.history(period="5y")5年历史价格
ticker.analyst_price_targets → 分析师目标价
DataFrame 转 dict 规范
所有 pandas DataFrame 通过 .to_dict() 转换
日期索引转为 ISO 字符串
NaN 值转为 None
保留最近 4/4 季度的数据
多市场特殊处理
A 股:部分字段名可能不同,info 中部分字段可能缺失;财务数据单位为人民币
港股:货币为 HKD;部分老股票 yfinance 数据可能不全
美股:数据最完整,为基准场景
八、CLI 交互设计
启动界面
plaintext
══════════════════════════════════════════════════════════
  🏛️ 巴菲特价值投资分析系统 v3.0
  "价格是你付出的,价值是你得到的" — 沃伦·巴菲特
══════════════════════════════════════════════════════════
  支持市场: 美股 | 港股 | A股(沪深)
  数据来源: Yahoo Finance 实时数据
  分析框架: 巴菲特/芒格价值投资十维评估体系

  使用示例:
    👉 分析一下贵州茅台
    👉 帮我看看苹果公司值不值得投资
    👉 腾讯 0700.HK 的内在价值是多少
    👉 MSFT 微软的护城河怎么样

  输入 "退出""exit" 结束程序
══════════════════════════════════════════════════════════
主循环逻辑
plaintext
while True:
    1. 显示 prompt "🏛️ 请输入您想分析的股票 > "
    2. 读取用户输入
    3. 检查退出词 ("退出", "exit", "quit", "q")
    4. 显示 "⏳ 正在进行价值投资分析,请稍候..."
    5. 构建初始 state,调用 graph.invoke()
    6. 打印 state["final_report"]
    7. 循环回到 1
用户体验细节
每个节点执行时打印进度提示(如 [1/6] 🔍 正在解析股票代码...)
分析耗时约 15-30 秒(多次 yfinance 调用 + 3 次 LLM 调用),需有进度反馈
支持直接输入股票代码(如 AAPL、600519)或自然语言(如 茅台怎么样)
九、错误处理策略
错误分层
错误类型	处理策略	用户提示
股票代码无法识别	parse_input 设 error,跳到 report	"抱歉,无法识别您提到的股票,请提供更具体的名称或代码"
yfinance 网络超时	重试 1 次(带 3 秒延迟),仍失败则报错	"网络连接超时,请检查网络后重试"
yfinance 返回空数据	info 为空 → 报错;其他数据缺失 → 降级(该维度评分标记 N/A)	"该股票的[某项]数据暂不可用,已跳过相关分析"
LLM 调用失败	重试 1 次;仍失败则使用纯量化结果	"AI 分析暂不可用,以下为纯量化评分结果"
A 股/港股数据不全	降级处理,缺失维度给中间分并标注	"[某指标] 数据缺失,已按保守估计处理"
用户输入非股票问题	LLM parse 节点识别出非股票相关 → 友好引导	"本系统专注于股票价值投资分析,请输入股票名称或代码"
全局异常兜底
主循环中 try/except Exception 包裹整个 graph.invoke()
任何未预期异常:打印友好错误消息 + 异常信息(debug 用),不退出循环
十、依赖说明
已安装(无需额外安装):
langchain 0.3.28
langchain-community 0.3.31(包含 ChatTongyi)
langgraph 0.6.11
langsmith 0.4.37
yfinance 1.2.0
dashscope(ChatTongyi 底层依赖)
环境变量要求(已在现有文件中配置):
LANGSMITH_API_KEY — LangSmith 追踪
DASHSCOPE_API_KEY — 通义千问 API(ChatTongyi 依赖)
无需新增任何 pip install,所有依赖均已就绪。
十一、与现有 v2 的关键差异对比
维度	v2 (现有)	v3 (新设计)
数据源	DuckDuckGo 搜索(不稳定、需解析文本)	yfinance API(结构化、完整财务数据)
分析深度	单次 LLM 分析("短期趋势+风险等级"10 维量化评分 + 3 次 LLM 深度分析
投资哲学	无特定框架	严格巴菲特/芒格价值投资体系
节点数	4 个(parse→fetch→analyze→recommend)	6+ 条件路由
市场覆盖	有限的硬编码映射	US + HK + A 股全覆盖
交互方式	单次执行	交互式循环(可连续分析多只股票)
估值模型	无	DCF + Graham 公式 + 安全边际
错误处理	有 demo 数据兜底	分层容错、降级处理
State 复杂度	6 个字段	11 个字段,更丰富的中间状态
Critical Files for Implementation
/Users/nizhen/Desktop/AI_Coding/langchain_demo_w_langsmith_w_langgraph_v2.py — 核心参考模板:现有 LangGraph 工作流的 State 定义、节点函数、Graph 构建模式和 LangSmith 配置全部沿用其风格
/Users/nizhen/Desktop/AI_Coding/langchain_demo_w_langsmith_w_langgraph.py — 条件路由参考:其中 should_continue 和 add_conditional_edges 的用法将在新 v3 的错误路由中复用
/Users/nizhen/Desktop/AI_Coding/langchain_demo_w_langsmith.py — ChatTongyi 初始化参考:确认 LLM 初始化方式和环境变量配置的最简模式
(新建)/Users/nizhen/Desktop/AI_Coding/buffett_stock_analyzer_v3.py — 要创建的目标文件:实现全部 6 个 LangGraph 节点、10 维评分系统、DCF 估值和 CLI 交互循环
b.创建to-do-list:

创建 buffett_stock_analyzer.py 基础结构(导入、配置、State 定义、常量映射表)
实现节点1 parse_input — 股票代码解析(本地映射 + LLM fallback)
实现节点2 fetch_data — yfinance 全量数据获取(分层容错)
实现节点3 fundamental_analysis — 巴菲特十维评分系统
实现节点4 moat_assessment — 经济护城河评估
实现节点5 valuation_analysis — DCF 估值与安全边际
实现节点6 generate_report — 巴菲特风格报告生成
构建 LangGraph 工作流(节点+边+条件路由)
实现 CLI 交互主循环
运行测试验证

c.理财分析demo代码:
"""
巴菲特/芒格价值投资股票分析 Agent
====================================
严格遵循巴菲特和芒格的价值投资哲学,
通过 LangGraph 多节点工作流对股票进行深度分析。

工作流:
  解析输入 → 获取数据 → 十维基本面评分 → 护城河评估 → DCF估值 → 生成报告

支持市场:美股 | 港股 | A股(沪深)
数据来源:Yahoo Finance (yfinance)
分析框架:巴菲特/芒格价值投资十维评估体系
"""

import os
import json
import re
import time
import math
from typing import TypedDict, Literal, Annotated, Optional

import yfinance as yf
import pandas as pd
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages


# ============================================
# 配置 LangSmith 追踪
# ============================================
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "buffett_stock_analyzer"


# ============================================
# 初始化 LLM(延迟加载,避免在导入时就要求 API Key)
# ============================================
_chatLLM = None

def get_llm():
    global _chatLLM
    if _chatLLM is None:
        _chatLLM = ChatTongyi(model="qwen-plus", streaming=False)
    return _chatLLM


# ============================================
# 热门股票本地映射表
# ============================================
STOCK_MAPPING = {
    # ===== 美股 =====
    "苹果": ("AAPL", "苹果公司", "US"),
    "apple": ("AAPL", "苹果公司", "US"),
    "aapl": ("AAPL", "苹果公司", "US"),
    "微软": ("MSFT", "微软公司", "US"),
    "microsoft": ("MSFT", "微软公司", "US"),
    "msft": ("MSFT", "微软公司", "US"),
    "谷歌": ("GOOGL", "谷歌(Alphabet)", "US"),
    "google": ("GOOGL", "谷歌(Alphabet)", "US"),
    "googl": ("GOOGL", "谷歌(Alphabet)", "US"),
    "亚马逊": ("AMZN", "亚马逊", "US"),
    "amazon": ("AMZN", "亚马逊", "US"),
    "amzn": ("AMZN", "亚马逊", "US"),
    "特斯拉": ("TSLA", "特斯拉", "US"),
    "tesla": ("TSLA", "特斯拉", "US"),
    "tsla": ("TSLA", "特斯拉", "US"),
    "英伟达": ("NVDA", "英伟达", "US"),
    "nvidia": ("NVDA", "英伟达", "US"),
    "nvda": ("NVDA", "英伟达", "US"),
    "meta": ("META", "Meta Platforms", "US"),
    "脸书": ("META", "Meta Platforms", "US"),
    "facebook": ("META", "Meta Platforms", "US"),
    "阿里巴巴": ("BABA", "阿里巴巴(美股ADR)", "US"),
    "阿里": ("BABA", "阿里巴巴(美股ADR)", "US"),
    "baba": ("BABA", "阿里巴巴(美股ADR)", "US"),
    "京东": ("JD", "京东(美股ADR)", "US"),
    "jd": ("JD", "京东(美股ADR)", "US"),
    "拼多多": ("PDD", "拼多多", "US"),
    "pdd": ("PDD", "拼多多", "US"),
    "网易": ("NTES", "网易(美股ADR)", "US"),
    "ntes": ("NTES", "网易(美股ADR)", "US"),
    "百度": ("BIDU", "百度(美股ADR)", "US"),
    "bidu": ("BIDU", "百度(美股ADR)", "US"),
    "蔚来": ("NIO", "蔚来汽车", "US"),
    "nio": ("NIO", "蔚来汽车", "US"),
    "伯克希尔": ("BRK-B", "伯克希尔哈撒韦B", "US"),
    "berkshire": ("BRK-B", "伯克希尔哈撒韦B", "US"),
    "可口可乐": ("KO", "可口可乐", "US"),
    "cocacola": ("KO", "可口可乐", "US"),
    "ko": ("KO", "可口可乐", "US"),
    "强生": ("JNJ", "强生公司", "US"),
    "jnj": ("JNJ", "强生公司", "US"),
    "摩根大通": ("JPM", "摩根大通", "US"),
    "jpm": ("JPM", "摩根大通", "US"),
    "沃尔玛": ("WMT", "沃尔玛", "US"),
    "walmart": ("WMT", "沃尔玛", "US"),
    "wmt": ("WMT", "沃尔玛", "US"),
    # ===== 港股 =====
    "腾讯": ("0700.HK", "腾讯控股", "HK"),
    "tencent": ("0700.HK", "腾讯控股", "HK"),
    "0700": ("0700.HK", "腾讯控股", "HK"),
    "00700": ("0700.HK", "腾讯控股", "HK"),
    "美团": ("3690.HK", "美团", "HK"),
    "3690": ("3690.HK", "美团", "HK"),
    "小米": ("1810.HK", "小米集团", "HK"),
    "1810": ("1810.HK", "小米集团", "HK"),
    "01810": ("1810.HK", "小米集团", "HK"),
    "快手": ("1024.HK", "快手科技", "HK"),
    "1024": ("1024.HK", "快手科技", "HK"),
    "比亚迪港股": ("1211.HK", "比亚迪(港股)", "HK"),
    "1211": ("1211.HK", "比亚迪(港股)", "HK"),
    "中国平安港股": ("2318.HK", "中国平安(港股)", "HK"),
    "2318": ("2318.HK", "中国平安(港股)", "HK"),
    "香港交易所": ("0388.HK", "香港交易所", "HK"),
    "港交所": ("0388.HK", "香港交易所", "HK"),
    "0388": ("0388.HK", "香港交易所", "HK"),
    "汇丰": ("0005.HK", "汇丰控股", "HK"),
    "0005": ("0005.HK", "汇丰控股", "HK"),
    # ===== A股 =====
    "茅台": ("600519.SS", "贵州茅台", "A_SHARE"),
    "贵州茅台": ("600519.SS", "贵州茅台", "A_SHARE"),
    "600519": ("600519.SS", "贵州茅台", "A_SHARE"),
    "五粮液": ("000858.SZ", "五粮液", "A_SHARE"),
    "000858": ("000858.SZ", "五粮液", "A_SHARE"),
    "比亚迪": ("002594.SZ", "比亚迪", "A_SHARE"),
    "002594": ("002594.SZ", "比亚迪", "A_SHARE"),
    "宁德时代": ("300750.SZ", "宁德时代", "A_SHARE"),
    "300750": ("300750.SZ", "宁德时代", "A_SHARE"),
    "招商银行": ("600036.SS", "招商银行", "A_SHARE"),
    "600036": ("600036.SS", "招商银行", "A_SHARE"),
    "中国平安": ("601318.SS", "中国平安", "A_SHARE"),
    "601318": ("601318.SS", "中国平安", "A_SHARE"),
    "平安": ("601318.SS", "中国平安", "A_SHARE"),
    "工商银行": ("601398.SS", "工商银行", "A_SHARE"),
    "601398": ("601398.SS", "工商银行", "A_SHARE"),
    "中国银行": ("601988.SS", "中国银行", "A_SHARE"),
    "601988": ("601988.SS", "中国银行", "A_SHARE"),
    "恒瑞医药": ("600276.SS", "恒瑞医药", "A_SHARE"),
    "600276": ("600276.SS", "恒瑞医药", "A_SHARE"),
    "海天味业": ("603288.SS", "海天味业", "A_SHARE"),
    "603288": ("603288.SS", "海天味业", "A_SHARE"),
    "格力电器": ("000651.SZ", "格力电器", "A_SHARE"),
    "000651": ("000651.SZ", "格力电器", "A_SHARE"),
    "美的集团": ("000333.SZ", "美的集团", "A_SHARE"),
    "000333": ("000333.SZ", "美的集团", "A_SHARE"),
    "万科": ("000002.SZ", "万科A", "A_SHARE"),
    "000002": ("000002.SZ", "万科A", "A_SHARE"),
    "中信证券": ("600030.SS", "中信证券", "A_SHARE"),
    "600030": ("600030.SS", "中信证券", "A_SHARE"),
    "片仔癀": ("600436.SS", "片仔癀", "A_SHARE"),
    "600436": ("600436.SS", "片仔癀", "A_SHARE"),
    "隆基绿能": ("601012.SS", "隆基绿能", "A_SHARE"),
    "601012": ("601012.SS", "隆基绿能", "A_SHARE"),
    "药明康德": ("603259.SS", "药明康德", "A_SHARE"),
    "603259": ("603259.SS", "药明康德", "A_SHARE"),
    "长江电力": ("600900.SS", "长江电力", "A_SHARE"),
    "600900": ("600900.SS", "长江电力", "A_SHARE"),
    "伊利股份": ("600887.SS", "伊利股份", "A_SHARE"),
    "600887": ("600887.SS", "伊利股份", "A_SHARE"),
    "泸州老窖": ("000568.SZ", "泸州老窖", "A_SHARE"),
    "000568": ("000568.SZ", "泸州老窖", "A_SHARE"),
    "中国神华": ("601088.SS", "中国神华", "A_SHARE"),
    "601088": ("601088.SS", "中国神华", "A_SHARE"),
}


# ============================================
# State 定义
# ============================================
class BuffettAnalysisState(TypedDict):
    messages: Annotated[list, add_messages]
    user_query: str
    stock_ticker: str
    stock_name: str
    market: str  # "US" / "HK" / "A_SHARE"
    raw_data: dict
    fundamental_scores: dict
    moat_assessment: str
    valuation_result: dict
    final_report: str
    error: str


# ============================================
# LLM System Prompts
# ============================================
PARSE_INPUT_SYSTEM_PROMPT = """你是一个股票代码解析助手。用户会用自然语言提到一只股票,你需要识别出对应的 yfinance 股票代码。

规则:
- 美股:直接返回代码(如 AAPL, MSFT, BABA)
- 港股:数字代码加 .HK 后缀(如 0700.HK, 9988.HK),注意港股代码需补齐4位
- A股上海(代码以6开头):6位代码加 .SS(如 600519.SS)
- A股深圳(代码以0或3开头):6位代码加 .SZ(如 000858.SZ, 300750.SZ)

你必须以严格 JSON 格式回复,不要包含任何其他文字:
{"ticker": "代码", "name": "公司中文名", "market": "US或HK或A_SHARE"}

如果无法识别,返回:
{"ticker": "", "name": "", "market": "", "error": "无法识别的股票"}"""

MOAT_SYSTEM_PROMPT = """你是查理·芒格,以跨学科思维模型闻名的投资大师。请基于提供的财务数据,评估这家公司的经济护城河。

从以下五个维度分析护城河:
1. 品牌价值 — 公司品牌是否有定价权?利润率是否远高于同行?
2. 转换成本 — 客户是否难以切换到竞争对手?产品是否深度嵌入客户流程?
3. 网络效应 — 使用的人越多,产品是否越有价值?
4. 成本优势 — 公司是否有结构性的成本优势?规模效应如何?
5. 准入壁垒 — 新竞争者是否难以进入?是否有牌照、专利、监管等门槛?

每个维度给出「强/中/弱/无」评级并用1-2句话说明理由。
最终得出综合判断:宽护城河 / 窄护城河 / 无护城河

记住芒格的忠告:"只投资你完全理解的企业。"如果数据不足以判断,坦诚说明。
请用中文回答,保持简洁专业。"""

REPORT_SYSTEM_PROMPT = """你是一位严格遵循巴菲特和芒格价值投资哲学的投资分析师。请基于提供的全部分析数据,撰写一份巴菲特风格的投资分析报告。

核心原则(必须在报告中体现):
1. 内在价值:股价只是价格,价值才是本质
2. 安全边际:只在价格远低于内在价值时才值得投资
3. 经济护城河:寻找有持久竞争优势的企业
4. 长期视角:以10年为周期思考投资
5. 能力圈:只投资能理解的企业
6. 管理层:好企业需要好管理层

语气要求:
- 用巴菲特式的朴素智慧和幽默感
- 适当引用巴菲特/芒格的经典名言(1-2句即可)
- 保守且诚实,不确定的地方坦诚告知
- 永远提醒"投资有风险"

如果分析结果显示这是一只投机性强的股票,请明确警告,可引用:
"如果你不愿意持有一只股票十年,那就不要持有它十分钟。"

请严格按照以下格式输出报告:

一、公司概况
   [用2-3句话描述公司业务和行业地位]

二、巴菲特评分卡
   [列出每个维度的得分和简评,已在输入中提供]

三、经济护城河
   [综合护城河评级和关键分析]

四、估值分析
   [当前价格 vs 内在价值 vs 安全边际]

五、巴菲特会怎么说?
   [用巴菲特/芒格的口吻给出最终建议,引用1-2句经典语录]

六、风险提示
   [列出2-3个关键风险因素]

请用中文回答。"""


# ============================================
# 工具函数:自动补全股票代码后缀
# ============================================
def auto_complete_ticker(code: str) -> tuple:
    """
    根据纯数字代码自动补全 yfinance 后缀。
    返回 (ticker, market)
    """
    code = code.strip()
    # 已有后缀的直接返回
    if ".SS" in code or ".SZ" in code:
        return code, "A_SHARE"
    if ".HK" in code:
        return code, "HK"

    # 纯数字
    if code.isdigit():
        if len(code) == 6:
            if code.startswith("6"):
                return f"{code}.SS", "A_SHARE"
            elif code.startswith(("0", "3")):
                return f"{code}.SZ", "A_SHARE"
        elif len(code) <= 5:
            hk_code = code.zfill(4)
            return f"{hk_code}.HK", "HK"

    # 纯英文字母 → 美股
    if code.isalpha():
        return code.upper(), "US"

    return code, "US"


# ============================================
# 工具函数:yfinance 数据获取
# ============================================
def safe_df_to_dict(df) -> Optional[dict]:
    """安全地将 DataFrame 转为 dict,处理各种边界情况"""
    if df is None:
        return None
    if isinstance(df, pd.DataFrame) and df.empty:
        return None
    if isinstance(df, pd.Series) and df.empty:
        return None
    try:
        if isinstance(df, pd.DataFrame):
            result = {}
            for col in df.columns:
                col_key = str(col)
                result[col_key] = {}
                for idx in df.index:
                    idx_key = str(idx)
                    val = df.loc[idx, col]
                    if pd.isna(val):
                        result[col_key][idx_key] = None
                    elif isinstance(val, (int, float)):
                        result[col_key][idx_key] = float(val)
                    else:
                        result[col_key][idx_key] = str(val)
            return result
        elif isinstance(df, pd.Series):
            result = {}
            for idx in df.index:
                val = df[idx]
                if pd.isna(val):
                    result[str(idx)] = None
                elif isinstance(val, (int, float)):
                    result[str(idx)] = float(val)
                else:
                    result[str(idx)] = str(val)
            return result
    except Exception:
        return None
    return None


def fetch_comprehensive_data(ticker_symbol: str) -> dict:
    """
    使用 yfinance 一次性获取全部财务数据,分层容错。
    遇到频率限制时自动重试(最多2次,间隔递增)。
    返回结构化的 dict。
    """
    ticker = yf.Ticker(ticker_symbol)
    data = {"ticker_symbol": ticker_symbol}

    # --- 必需层(带重试) ---
    info = None
    max_retries = 2
    for attempt in range(max_retries + 1):
        try:
            info = ticker.info
            break
        except Exception as e:
            if "Too Many Requests" in str(e) and attempt < max_retries:
                wait = (attempt + 1) * 5
                print(f"         Yahoo Finance 频率限制,{wait}秒后重试({attempt+1}/{max_retries})...")
                time.sleep(wait)
                continue
            return {"error": f"获取基础数据失败: {str(e)}"}

    try:
        if not info or (info.get("regularMarketPrice") is None and info.get("currentPrice") is None and info.get("previousClose") is None):
            fi = ticker.fast_info
            if hasattr(fi, "last_price") and fi.last_price:
                data["current_price"] = float(fi.last_price)
                data["market_cap"] = float(fi.market_cap) if hasattr(fi, "market_cap") and fi.market_cap else None
            else:
                return {"error": f"yfinance 无法获取 {ticker_symbol} 的数据,请检查代码是否正确"}
            data["info"] = {}
        else:
            data["info"] = info
            data["current_price"] = info.get("currentPrice") or info.get("regularMarketPrice") or info.get("previousClose")
            data["market_cap"] = info.get("marketCap")
    except Exception as e:
        return {"error": f"获取基础数据失败: {str(e)}"}

    # --- 重要层 ---
    for key, fetcher in [
        ("income_stmt", lambda: ticker.income_stmt),
        ("balance_sheet", lambda: ticker.balance_sheet),
        ("cash_flow", lambda: ticker.cash_flow),
        ("quarterly_income_stmt", lambda: ticker.quarterly_income_stmt),
    ]:
        try:
            data[key] = safe_df_to_dict(fetcher())
        except Exception:
            data[key] = None

    # --- 增强层 ---
    try:
        divs = ticker.dividends
        if divs is not None and not divs.empty:
            data["dividends"] = {str(k): float(v) for k, v in divs.items()}
        else:
            data["dividends"] = None
    except Exception:
        data["dividends"] = None

    try:
        mh = ticker.major_holders
        data["major_holders"] = safe_df_to_dict(mh)
    except Exception:
        data["major_holders"] = None

    try:
        ih = ticker.institutional_holders
        data["institutional_holders"] = safe_df_to_dict(ih)
    except Exception:
        data["institutional_holders"] = None

    return data


# ============================================
# 巴菲特十维评分系统
# ============================================
def _safe_get(d: dict, *keys, default=None):
    """安全地从嵌套 dict 中取值"""
    current = d
    for k in keys:
        if isinstance(current, dict):
            current = current.get(k)
            if current is None:
                return default
        else:
            return default
    return current if current is not None else default


def _extract_annual_values(stmt_dict: Optional[dict], row_names: list) -> list:
    """
    从年度财务报表 dict 中提取某一行的年度数值列表(从旧到新排序)。
    stmt_dict 的结构:{列名(日期字符串): {行名: 值}}
    """
    if not stmt_dict:
        return []
    values = []
    # 列按日期排序(字符串排序通常就是按日期)
    sorted_cols = sorted(stmt_dict.keys())
    for col in sorted_cols:
        col_data = stmt_dict[col]
        if not isinstance(col_data, dict):
            continue
        for rn in row_names:
            if rn in col_data and col_data[rn] is not None:
                values.append(float(col_data[rn]))
                break
    return values


def score_roe_stability(raw_data: dict) -> tuple:
    """维度1: ROE 稳定性 (10分)"""
    income = raw_data.get("income_stmt")
    balance = raw_data.get("balance_sheet")
    if not income or not balance:
        return (5, "数据不足,无法计算 ROE")

    net_incomes = _extract_annual_values(income, ["Net Income", "NetIncome", "Net Income Common Stockholders"])
    equities = _extract_annual_values(balance, ["Stockholders Equity", "Total Stockholders Equity",
                                                 "Stockholders' Equity", "Common Stock Equity",
                                                 "Total Equity Gross Minority Interest"])

    if len(net_incomes) < 2 or len(equities) < 2:
        return (5, "历史数据不足(<2年),无法评估 ROE 稳定性")

    min_len = min(len(net_incomes), len(equities))
    net_incomes = net_incomes[-min_len:]
    equities = equities[-min_len:]

    roes = []
    for ni, eq in zip(net_incomes, equities):
        if eq and eq != 0:
            roes.append(ni / eq * 100)

    if not roes:
        return (3, "股东权益数据异常,无法计算 ROE")

    avg_roe = sum(roes) / len(roes)
    all_above_15 = all(r > 15 for r in roes)
    all_above_20 = all(r > 20 for r in roes)
    roe_std = (sum((r - avg_roe) ** 2 for r in roes) / len(roes)) ** 0.5

    score = 0
    if all_above_20 and roe_std < 5:
        score = 10
    elif all_above_15:
        score = 8
    elif avg_roe > 15:
        score = 6
    elif avg_roe > 10:
        score = 4
    elif avg_roe > 5:
        score = 2

    if roe_std > 10 and score > 2:
        score -= 2

    desc = f"近{len(roes)}年 ROE: {', '.join(f'{r:.1f}%' for r in roes)},均值 {avg_roe:.1f}%"
    return (max(0, score), desc)


def score_earnings_growth(raw_data: dict) -> tuple:
    """维度2: 盈利增长 (10分)"""
    income = raw_data.get("income_stmt")
    if not income:
        return (5, "数据不足")

    net_incomes = _extract_annual_values(income, ["Net Income", "NetIncome", "Net Income Common Stockholders"])
    if len(net_incomes) < 2:
        return (5, "历史数据不足")

    first, last = net_incomes[0], net_incomes[-1]
    if first <= 0 or last <= 0:
        if last > 0 and first <= 0:
            return (6, "从亏损转为盈利,增长态势良好")
        return (2, f"存在亏损年份,净利润序列不稳定")

    years = len(net_incomes) - 1
    cagr = ((last / first) ** (1.0 / years) - 1) * 100 if years > 0 else 0

    if cagr > 20:
        score = 10
    elif cagr > 15:
        score = 8
    elif cagr > 10:
        score = 6
    elif cagr > 5:
        score = 4
    elif cagr > 0:
        score = 2
    else:
        score = 0

    desc = f"近{years}年净利润 CAGR: {cagr:.1f}%"
    return (score, desc)


def score_profit_margin(raw_data: dict) -> tuple:
    """维度3: 净利润率 (10分)"""
    income = raw_data.get("income_stmt")
    if not income:
        return (5, "数据不足")

    net_incomes = _extract_annual_values(income, ["Net Income", "NetIncome", "Net Income Common Stockholders"])
    revenues = _extract_annual_values(income, ["Total Revenue", "TotalRevenue", "Revenue", "Operating Revenue"])

    if not net_incomes or not revenues:
        # 尝试从 info 获取
        margin = _safe_get(raw_data, "info", "profitMargins")
        if margin is not None:
            margin_pct = margin * 100
            if margin_pct > 30:
                return (10, f"净利润率 {margin_pct:.1f}% (来自info)")
            elif margin_pct > 20:
                return (8, f"净利润率 {margin_pct:.1f}%")
            elif margin_pct > 15:
                return (6, f"净利润率 {margin_pct:.1f}%")
            elif margin_pct > 10:
                return (4, f"净利润率 {margin_pct:.1f}%")
            elif margin_pct > 5:
                return (2, f"净利润率 {margin_pct:.1f}%")
            else:
                return (0, f"净利润率 {margin_pct:.1f}%,利润微薄")
        return (5, "数据不足")

    # 取最新年度
    ni = net_incomes[-1]
    rev = revenues[-1] if revenues else 0
    if rev == 0:
        return (3, "营收为零,无法计算利润率")

    margin = ni / rev * 100
    if margin > 30:
        score = 10
    elif margin > 20:
        score = 8
    elif margin > 15:
        score = 6
    elif margin > 10:
        score = 4
    elif margin > 5:
        score = 2
    else:
        score = 0

    return (score, f"最新年度净利润率: {margin:.1f}%")


def score_debt_ratio(raw_data: dict) -> tuple:
    """维度4: 资产负债率 (10分)"""
    balance = raw_data.get("balance_sheet")
    if not balance:
        return (5, "数据不足")

    total_assets = _extract_annual_values(balance, ["Total Assets", "TotalAssets"])
    total_debt = _extract_annual_values(balance, ["Total Debt", "TotalDebt", "Total Liabilities Net Minority Interest",
                                                   "Total Liab", "Total Liabilities"])

    if not total_assets or not total_debt:
        # 从 info 获取
        dr = _safe_get(raw_data, "info", "debtToEquity")
        if dr is not None:
            dr_val = dr / 100  # debtToEquity 是百分比
            if dr_val < 0.3:
                return (10, f"负债/权益比 {dr:.0f}%,非常健康")
            elif dr_val < 0.5:
                return (8, f"负债/权益比 {dr:.0f}%")
            elif dr_val < 1.0:
                return (6, f"负债/权益比 {dr:.0f}%")
            elif dr_val < 1.5:
                return (4, f"负债/权益比 {dr:.0f}%")
            else:
                return (2, f"负债/权益比 {dr:.0f}%,负债偏高")
        return (5, "数据不足")

    assets = total_assets[-1]
    debt = total_debt[-1]
    if assets == 0:
        return (3, "总资产为零,数据异常")

    ratio = debt / assets * 100

    # 检查是否为金融行业(银行、保险等天然高负债)
    industry = _safe_get(raw_data, "info", "industry", default="")
    is_financial = any(kw in str(industry).lower() for kw in ["bank", "insurance", "financial", "银行", "保险", "金融"])

    if is_financial:
        # 金融行业标准放宽
        if ratio < 85:
            score = 8
        elif ratio < 90:
            score = 6
        elif ratio < 93:
            score = 4
        else:
            score = 2
        return (score, f"资产负债率: {ratio:.1f}%(金融行业,已放宽标准)")

    if ratio < 20:
        score = 10
    elif ratio < 30:
        score = 8
    elif ratio < 40:
        score = 6
    elif ratio < 50:
        score = 4
    elif ratio < 60:
        score = 2
    else:
        score = 0

    return (score, f"资产负债率: {ratio:.1f}%")


def score_current_ratio(raw_data: dict) -> tuple:
    """维度5: 流动比率 (10分)"""
    balance = raw_data.get("balance_sheet")
    if not balance:
        cr = _safe_get(raw_data, "info", "currentRatio")
        if cr is not None:
            if cr > 2.0:
                return (10, f"流动比率: {cr:.2f}")
            elif cr > 1.5:
                return (8, f"流动比率: {cr:.2f}")
            elif cr > 1.2:
                return (6, f"流动比率: {cr:.2f}")
            elif cr > 1.0:
                return (4, f"流动比率: {cr:.2f}")
            else:
                return (2, f"流动比率: {cr:.2f},流动性不足")
        return (5, "数据不足")

    current_assets = _extract_annual_values(balance, ["Current Assets", "CurrentAssets", "Total Current Assets"])
    current_liabs = _extract_annual_values(balance, ["Current Liabilities", "CurrentLiabilities",
                                                      "Total Current Liabilities", "Current Debt And Capital Lease Obligation"])

    if not current_assets or not current_liabs:
        return (5, "流动资产/负债数据缺失")

    ca = current_assets[-1]
    cl = current_liabs[-1]
    if cl == 0:
        return (10, "无流动负债")

    cr = ca / cl
    if cr > 2.0:
        score = 10
    elif cr > 1.5:
        score = 8
    elif cr > 1.2:
        score = 6
    elif cr > 1.0:
        score = 4
    elif cr > 0.8:
        score = 2
    else:
        score = 0

    return (score, f"流动比率: {cr:.2f}")


def score_free_cash_flow(raw_data: dict) -> tuple:
    """维度6: 自由现金流 (10分)"""
    cf = raw_data.get("cash_flow")
    if not cf:
        fcf = _safe_get(raw_data, "info", "freeCashflow")
        if fcf is not None:
            if fcf > 0:
                return (6, f"自由现金流为正: {fcf/1e8:.1f}亿")
            else:
                return (2, f"自由现金流为负")
        return (5, "数据不足")

    operating_cf = _extract_annual_values(cf, ["Operating Cash Flow", "Total Cash From Operating Activities",
                                                "Cash Flow From Continuing Operating Activities"])
    capex = _extract_annual_values(cf, ["Capital Expenditure", "Capital Expenditures"])

    if not operating_cf:
        return (5, "经营现金流数据缺失")

    # 计算 FCF
    fcfs = []
    for i in range(len(operating_cf)):
        ocf = operating_cf[i]
        cx = abs(capex[i]) if i < len(capex) and capex[i] else 0
        fcfs.append(ocf - cx)

    all_positive = all(f > 0 for f in fcfs)
    if len(fcfs) >= 2:
        growing = fcfs[-1] > fcfs[0]
    else:
        growing = False

    if all_positive and growing and len(fcfs) >= 3:
        score = 10
    elif all_positive and len(fcfs) >= 2:
        score = 8
    elif fcfs[-1] > 0:
        score = 6
    elif sum(1 for f in fcfs if f > 0) > len(fcfs) / 2:
        score = 4
    else:
        score = 2

    latest_fcf = fcfs[-1] / 1e8  # 转亿
    return (score, f"近{len(fcfs)}年FCF: 最新 {latest_fcf:.1f}亿,{'全部为正' if all_positive else '存在负值'}")


def score_pe(raw_data: dict) -> tuple:
    """维度7: PE 估值 (10分)"""
    pe = _safe_get(raw_data, "info", "trailingPE")
    if pe is None:
        pe = _safe_get(raw_data, "info", "forwardPE")
    if pe is None or pe <= 0:
        return (5, "PE 数据不可用或为负(可能亏损)")

    if pe < 10:
        score = 10
    elif pe < 15:
        score = 8
    elif pe < 20:
        score = 6
    elif pe < 25:
        score = 4
    elif pe < 35:
        score = 2
    else:
        score = 0

    return (score, f"PE(TTM): {pe:.1f}")


def score_pb(raw_data: dict) -> tuple:
    """维度8: PB 估值 (10分)"""
    pb = _safe_get(raw_data, "info", "priceToBook")
    if pb is None or pb <= 0:
        return (5, "PB 数据不可用")

    if pb < 1:
        score = 10
    elif pb < 1.5:
        score = 8
    elif pb < 2:
        score = 6
    elif pb < 3:
        score = 4
    elif pb < 5:
        score = 2
    else:
        score = 0

    return (score, f"PB: {pb:.2f}")


def score_dividend(raw_data: dict) -> tuple:
    """维度9: 分红记录 (10分)"""
    dividends = raw_data.get("dividends")
    if not dividends:
        div_yield = _safe_get(raw_data, "info", "dividendYield")
        if div_yield is not None and div_yield > 0:
            return (6, f"股息率: {div_yield*100:.2f}%,具体分红历史数据缺失")
        return (3, "无分红记录或数据不可用")

    # 按年统计分红
    years_with_div = set()
    for date_str in dividends.keys():
        try:
            year = str(date_str)[:4]
            years_with_div.add(year)
        except Exception:
            pass

    count = len(years_with_div)
    if count >= 5:
        score = 10
    elif count >= 4:
        score = 8
    elif count >= 3:
        score = 6
    elif count >= 2:
        score = 4
    elif count >= 1:
        score = 2
    else:
        score = 0

    return (score, f"在 {count} 个年度有分红记录")


def score_insider_holding(raw_data: dict) -> tuple:
    """维度10: 内部人持股 (10分)"""
    mh = raw_data.get("major_holders")
    if not mh:
        # 尝试 info 中的 heldPercentInsiders
        insider_pct = _safe_get(raw_data, "info", "heldPercentInsiders")
        if insider_pct is not None:
            pct = insider_pct * 100
            if pct > 20:
                return (10, f"内部人持股: {pct:.1f}%")
            elif pct > 10:
                return (8, f"内部人持股: {pct:.1f}%")
            elif pct > 5:
                return (6, f"内部人持股: {pct:.1f}%")
            elif pct > 1:
                return (4, f"内部人持股: {pct:.1f}%")
            else:
                return (2, f"内部人持股: {pct:.1f}%")
        return (5, "内部人持股数据不可用(中性评分)")

    # major_holders 通常有类似 "% of Shares Held by All Insider" 的行
    # 格式因 yfinance 版本而异,尝试多种方式
    try:
        for col in mh.values():
            if isinstance(col, dict):
                for k, v in col.items():
                    if "insider" in str(k).lower() or "insider" in str(v).lower():
                        # 找到包含 insider 的值
                        for kk, vv in col.items():
                            try:
                                pct = float(str(vv).replace("%", ""))
                                if pct > 20:
                                    return (10, f"内部人持股: {pct:.1f}%")
                                elif pct > 10:
                                    return (8, f"内部人持股: {pct:.1f}%")
                                elif pct > 5:
                                    return (6, f"内部人持股: {pct:.1f}%")
                                elif pct > 1:
                                    return (4, f"内部人持股: {pct:.1f}%")
                                else:
                                    return (2, f"内部人持股: {pct:.1f}%")
                            except (ValueError, TypeError):
                                continue
    except Exception:
        pass

    return (5, "内部人持股数据解析困难(中性评分)")


def compute_buffett_scores(raw_data: dict) -> dict:
    """执行全部十维评分,返回 {维度名: (分数, 说明)} + total_score"""
    scorers = [
        ("ROE稳定性", score_roe_stability),
        ("盈利增长", score_earnings_growth),
        ("净利润率", score_profit_margin),
        ("资产负债率", score_debt_ratio),
        ("流动比率", score_current_ratio),
        ("自由现金流", score_free_cash_flow),
        ("PE估值", score_pe),
        ("PB估值", score_pb),
        ("分红记录", score_dividend),
        ("内部人持股", score_insider_holding),
    ]

    results = {}
    total = 0
    for name, fn in scorers:
        s, desc = fn(raw_data)
        results[name] = {"score": s, "max": 10, "desc": desc}
        total += s

    results["total_score"] = total
    results["max_total"] = 100

    if total >= 80:
        results["grade"] = "A — 巴菲特可能感兴趣的优质企业"
    elif total >= 60:
        results["grade"] = "B — 有价值但需关注薄弱环节"
    elif total >= 40:
        results["grade"] = "C — 不符合巴菲特标准的关键维度较多"
    else:
        results["grade"] = "D — 巴菲特不会考虑,请警惕"

    return results


# ============================================
# 估值分析:DCF + Graham
# ============================================
def compute_valuation(raw_data: dict) -> dict:
    """计算 DCF 内在价值、Graham 公式估值、安全边际"""
    result = {
        "current_price": raw_data.get("current_price"),
        "dcf_intrinsic_value": None,
        "graham_value": None,
        "margin_of_safety_dcf": None,
        "margin_of_safety_graham": None,
        "valuation_verdict": "无法估值",
        "details": "",
    }

    current_price = raw_data.get("current_price")
    if not current_price or current_price <= 0:
        result["details"] = "当前股价不可用,无法估值"
        return result

    info = raw_data.get("info", {})
    cf = raw_data.get("cash_flow")
    income = raw_data.get("income_stmt")

    # ===== DCF 简化版 =====
    dcf_value = None
    dcf_detail = ""
    try:
        # Owner Earnings = 净利润 + 折旧 - 资本支出
        net_incomes = _extract_annual_values(income, ["Net Income", "NetIncome", "Net Income Common Stockholders"]) if income else []
        operating_cfs = _extract_annual_values(cf, ["Operating Cash Flow", "Total Cash From Operating Activities",
                                                     "Cash Flow From Continuing Operating Activities"]) if cf else []
        capexs = _extract_annual_values(cf, ["Capital Expenditure", "Capital Expenditures"]) if cf else []
        depreciation = _extract_annual_values(cf, ["Depreciation And Amortization", "Depreciation & Amortization",
                                                    "Depreciation"]) if cf else []

        if operating_cfs and capexs:
            # 用 OCF - Capex 作为 Owner Earnings
            latest_ocf = operating_cfs[-1]
            latest_capex = abs(capexs[-1]) if capexs[-1] else 0
            owner_earnings = latest_ocf - latest_capex
        elif net_incomes and depreciation and capexs:
            latest_ni = net_incomes[-1]
            latest_dep = depreciation[-1] if depreciation else 0
            latest_capex = abs(capexs[-1]) if capexs[-1] else 0
            owner_earnings = latest_ni + latest_dep - latest_capex
        elif net_incomes:
            owner_earnings = net_incomes[-1]
            dcf_detail += "(仅用净利润近似,未扣除资本支出)"
        else:
            owner_earnings = None

        if owner_earnings and owner_earnings > 0:
            # 增长率:取净利润 CAGR,但限制在 3%-15%
            if net_incomes and len(net_incomes) >= 2 and net_incomes[0] > 0:
                years = len(net_incomes) - 1
                raw_cagr = (net_incomes[-1] / net_incomes[0]) ** (1.0 / years) - 1
                growth_rate = max(0.03, min(0.15, raw_cagr))
            else:
                growth_rate = 0.05  # 保守假设 5%

            discount_rate = 0.10  # 巴菲特常用 10%
            terminal_growth = 0.03  # 永续增长率 3%

            # 10 年 DCF
            total_pv = 0
            for i in range(1, 11):
                future_oe = owner_earnings * (1 + growth_rate) ** i
                pv = future_oe / (1 + discount_rate) ** i
                total_pv += pv

            # 终值
            terminal_oe = owner_earnings * (1 + growth_rate) ** 10 * (1 + terminal_growth)
            terminal_value = terminal_oe / (discount_rate - terminal_growth)
            terminal_pv = terminal_value / (1 + discount_rate) ** 10
            total_pv += terminal_pv

            # 每股内在价值
            shares = info.get("sharesOutstanding")
            if shares and shares > 0:
                dcf_value = total_pv / shares
                dcf_detail = f"DCF假设: 增长率{growth_rate*100:.1f}%, 折现率{discount_rate*100:.0f}%, 终值增长{terminal_growth*100:.0f}% " + dcf_detail
            else:
                dcf_detail = "总股本数据缺失,无法计算每股价值"
    except Exception as e:
        dcf_detail = f"DCF 计算异常: {str(e)}"

    result["dcf_intrinsic_value"] = round(dcf_value, 2) if dcf_value else None
    result["dcf_detail"] = dcf_detail

    # ===== Graham 公式 =====
    graham_value = None
    graham_detail = ""
    try:
        eps = info.get("trailingEps")
        if eps and eps > 0:
            # 估计 5 年增长率
            if net_incomes and len(net_incomes) >= 2 and net_incomes[0] > 0:
                years = len(net_incomes) - 1
                g = ((net_incomes[-1] / net_incomes[0]) ** (1.0 / years) - 1) * 100
                g = max(0, min(20, g))  # 限制 0-20%
            else:
                g = 5  # 保守

            Y = 4.5  # AAA 债券收益率近似
            graham_value = eps * (8.5 + 2 * g) * 4.4 / Y
            graham_detail = f"Graham公式: EPS={eps:.2f}, g={g:.1f}%, Y={Y}%"
    except Exception as e:
        graham_detail = f"Graham 计算异常: {str(e)}"

    result["graham_value"] = round(graham_value, 2) if graham_value else None
    result["graham_detail"] = graham_detail

    # ===== 安全边际 =====
    if dcf_value and dcf_value > 0:
        mos = (dcf_value - current_price) / dcf_value * 100
        result["margin_of_safety_dcf"] = round(mos, 1)

    if graham_value and graham_value > 0:
        mos_g = (graham_value - current_price) / graham_value * 100
        result["margin_of_safety_graham"] = round(mos_g, 1)

    # 综合判断
    primary_mos = result["margin_of_safety_dcf"] or result["margin_of_safety_graham"]
    if primary_mos is not None:
        if primary_mos > 50:
            result["valuation_verdict"] = "严重低估 — 非常有吸引力"
        elif primary_mos > 25:
            result["valuation_verdict"] = "低估 — 有吸引力"
        elif primary_mos > 0:
            result["valuation_verdict"] = "估值合理 — 安全边际不足"
        elif primary_mos > -25:
            result["valuation_verdict"] = "略微高估"
        else:
            result["valuation_verdict"] = "严重高估 — 需警惕"

    return result


# ============================================
# 节点1: parse_input — 股票代码解析
# ============================================
def parse_input(state: BuffettAnalysisState):
    user_query = state["user_query"]
    print(f"  [1/6] 正在解析股票代码...")

    # 1) 本地映射表快速匹配
    query_lower = user_query.strip().lower()
    # 先尝试完全匹配
    if query_lower in STOCK_MAPPING:
        ticker, name, market = STOCK_MAPPING[query_lower]
        print(f"         本地映射命中: {name} ({ticker})")
        return {"stock_ticker": ticker, "stock_name": name, "market": market, "error": ""}

    # 尝试在 query 中搜索映射 key
    for key, (ticker, name, market) in STOCK_MAPPING.items():
        if key in query_lower:
            print(f"         本地映射命中: {name} ({ticker})")
            return {"stock_ticker": ticker, "stock_name": name, "market": market, "error": ""}

    # 2) 尝试自动补全纯代码输入
    # 提取可能的代码(纯数字或纯字母)
    code_match = re.search(r'\b([A-Za-z]{1,5})\b', user_query)
    num_match = re.search(r'\b(\d{4,6})\b', user_query)

    if num_match:
        code = num_match.group(1)
        ticker, market = auto_complete_ticker(code)
        print(f"         数字代码自动补全: {ticker}")
        return {"stock_ticker": ticker, "stock_name": ticker, "market": market, "error": ""}

    if code_match:
        code = code_match.group(1)
        # 过滤掉常见中文拼音词
        if len(code) >= 2 and code.upper() not in ("DE", "IN", "AT", "IS", "IT", "OR", "AN"):
            ticker, market = auto_complete_ticker(code)
            print(f"         字母代码识别: {ticker}")
            return {"stock_ticker": ticker, "stock_name": ticker, "market": market, "error": ""}

    # 3) LLM 解析
    print(f"         本地映射未命中,调用 AI 解析...")
    try:
        response = get_llm().invoke([
            SystemMessage(content=PARSE_INPUT_SYSTEM_PROMPT),
            HumanMessage(content=f"用户输入: {user_query}")
        ])
        content = response.content.strip()
        # 提取 JSON
        json_match = re.search(r'\{.*\}', content, re.DOTALL)
        if json_match:
            parsed = json.loads(json_match.group())
            if parsed.get("error"):
                return {"error": parsed["error"]}
            ticker = parsed.get("ticker", "")
            name = parsed.get("name", ticker)
            market = parsed.get("market", "US")
            if ticker:
                print(f"         AI 解析结果: {name} ({ticker})")
                return {"stock_ticker": ticker, "stock_name": name, "market": market, "error": ""}
    except Exception as e:
        print(f"         AI 解析异常: {e}")

    return {"error": f"无法识别您提到的股票: '{user_query}',请提供更具体的股票名称或代码(如 '茅台'、'AAPL'、'600519')"}


# ============================================
# 节点2: fetch_data — yfinance 数据获取
# ============================================
def fetch_data(state: BuffettAnalysisState):
    if state.get("error"):
        return {}

    ticker = state["stock_ticker"]
    print(f"  [2/6] 正在从 Yahoo Finance 获取 {ticker} 的财务数据...")

    raw_data = fetch_comprehensive_data(ticker)

    if "error" in raw_data:
        print(f"         获取失败: {raw_data['error']}")
        return {"error": raw_data["error"], "raw_data": {}}

    price = raw_data.get("current_price", "N/A")
    mcap = raw_data.get("market_cap")
    mcap_str = f"{mcap/1e8:.0f}亿" if mcap else "N/A"
    print(f"         当前价格: {price} | 市值: {mcap_str}")

    # 补全 stock_name(如果之前只有 ticker)
    name = state.get("stock_name", "")
    if name == ticker or not name:
        info_name = _safe_get(raw_data, "info", "longName") or _safe_get(raw_data, "info", "shortName") or ticker
        name = info_name

    return {"raw_data": raw_data, "stock_name": name, "error": ""}


# ============================================
# 节点3: fundamental_analysis — 基本面评分
# ============================================
def fundamental_analysis(state: BuffettAnalysisState):
    if state.get("error"):
        return {}

    raw_data = state["raw_data"]
    print(f"  [3/6] 正在进行巴菲特十维基本面评分...")

    scores = compute_buffett_scores(raw_data)
    total = scores["total_score"]
    grade = scores["grade"]
    print(f"         综合评分: {total}/100 — {grade}")

    return {"fundamental_scores": scores}


# ============================================
# 节点4: moat_assessment — 护城河评估
# ============================================
def moat_assessment(state: BuffettAnalysisState):
    if state.get("error"):
        return {}

    raw_data = state["raw_data"]
    stock_name = state["stock_name"]
    ticker = state["stock_ticker"]
    scores = state["fundamental_scores"]
    info = raw_data.get("info", {})

    print(f"  [4/6] 正在评估 {stock_name} 的经济护城河...")

    # 准备关键数据摘要给 LLM
    data_summary = f"""公司: {stock_name} ({ticker})
行业: {info.get('industry', 'N/A')} / {info.get('sector', 'N/A')}
市值: {info.get('marketCap', 'N/A')}
巴菲特评分: {scores.get('total_score', 'N/A')}/100

关键财务指标:
- 净利润率: {scores.get('净利润率', {}).get('desc', 'N/A')}
- ROE: {scores.get('ROE稳定性', {}).get('desc', 'N/A')}
- 盈利增长: {scores.get('盈利增长', {}).get('desc', 'N/A')}
- 负债: {scores.get('资产负债率', {}).get('desc', 'N/A')}
- 自由现金流: {scores.get('自由现金流', {}).get('desc', 'N/A')}

公司描述: {info.get('longBusinessSummary', 'N/A')[:500]}"""

    try:
        response = get_llm().invoke([
            SystemMessage(content=MOAT_SYSTEM_PROMPT),
            HumanMessage(content=data_summary)
        ])
        moat = response.content
        print(f"         护城河评估完成")
        return {"moat_assessment": moat}
    except Exception as e:
        print(f"         AI 护城河评估失败: {e}")
        return {"moat_assessment": "护城河评估暂不可用(AI 调用失败)"}


# ============================================
# 节点5: valuation_analysis — 估值分析
# ============================================
def valuation_analysis(state: BuffettAnalysisState):
    if state.get("error"):
        return {}

    raw_data = state["raw_data"]
    print(f"  [5/6] 正在计算内在价值与安全边际...")

    val = compute_valuation(raw_data)

    dcf = val.get("dcf_intrinsic_value")
    mos = val.get("margin_of_safety_dcf")
    verdict = val.get("valuation_verdict")
    if dcf:
        print(f"         DCF 内在价值: {dcf} | 安全边际: {mos}% | {verdict}")
    else:
        print(f"         估值结果: {verdict}")

    return {"valuation_result": val}


# ============================================
# 节点6: generate_report — 报告生成
# ============================================
def generate_report(state: BuffettAnalysisState):
    # 处理错误场景
    if state.get("error"):
        error_msg = state["error"]
        report = f"""
{'='*60}
  抱歉,分析未能完成
{'='*60}

{error_msg}

建议:
- 检查股票名称或代码是否正确
- 美股示例:AAPL、MSFT、BABA
- 港股示例:腾讯、0700
- A股示例:茅台、600519、比亚迪

正如巴菲特所说:"风险来自于你不知道自己在做什么。"
{'='*60}"""
        print(f"  [6/6] 输出错误提示")
        return {"final_report": report, "messages": [AIMessage(content=report)]}

    print(f"  [6/6] 正在生成巴菲特风格投资分析报告...")

    stock_name = state["stock_name"]
    ticker = state["stock_ticker"]
    market = state["market"]
    raw_data = state["raw_data"]
    scores = state["fundamental_scores"]
    moat = state["moat_assessment"]
    val = state["valuation_result"]
    info = raw_data.get("info", {})

    market_label = {"US": "美股", "HK": "港股", "A_SHARE": "A股"}.get(market, market)
    currency = info.get("currency", "USD")

    # 构建评分卡文本
    score_card = ""
    for dim_name in ["ROE稳定性", "盈利增长", "净利润率", "资产负债率", "流动比率",
                      "自由现金流", "PE估值", "PB估值", "分红记录", "内部人持股"]:
        dim = scores.get(dim_name, {})
        s = dim.get("score", "?")
        d = dim.get("desc", "")
        icon = "+" if s >= 7 else ("~" if s >= 4 else "-")
        score_card += f"   [{icon}] {dim_name}: {s}/10 — {d}\n"

    total = scores.get("total_score", "?")
    grade = scores.get("grade", "")

    # 估值文本
    val_text = f"当前价格: {currency} {val.get('current_price', 'N/A')}\n"
    if val.get("dcf_intrinsic_value"):
        val_text += f"DCF内在价值: {currency} {val['dcf_intrinsic_value']}\n"
        val_text += f"DCF安全边际: {val.get('margin_of_safety_dcf', 'N/A')}%\n"
        val_text += f"({val.get('dcf_detail', '')})\n"
    if val.get("graham_value"):
        val_text += f"Graham估值: {currency} {val['graham_value']}\n"
        val_text += f"Graham安全边际: {val.get('margin_of_safety_graham', 'N/A')}%\n"
        val_text += f"({val.get('graham_detail', '')})\n"
    val_text += f"估值判断: {val.get('valuation_verdict', 'N/A')}"

    # 组装 LLM 输入
    full_data = f"""===== 分析数据汇总 =====

公司: {stock_name} ({ticker}) | {market_label}
行业: {info.get('industry', 'N/A')} / {info.get('sector', 'N/A')}
市值: {info.get('marketCap', 'N/A')}

----- 巴菲特评分卡 [{total}/100] -----
评级: {grade}
{score_card}

----- 经济护城河评估 -----
{moat}

----- 估值分析 -----
{val_text}
"""

    try:
        response = get_llm().invoke([
            SystemMessage(content=REPORT_SYSTEM_PROMPT),
            HumanMessage(content=full_data)
        ])
        ai_report = response.content

        # 包装最终报告
        report = f"""
{'='*60}
  巴菲特价值投资分析报告
  {stock_name} ({ticker}) | {market_label}
{'='*60}

{ai_report}

  免责声明:本分析仅供学习参考,不构成投资建议。
  投资有风险,入市需谨慎。
{'='*60}"""

    except Exception as e:
        # LLM 失败,直接输出量化结果
        report = f"""
{'='*60}
  巴菲特价值投资分析报告(量化版)
  {stock_name} ({ticker}) | {market_label}
{'='*60}

(AI 分析生成失败: {e},以下为纯量化评分结果)

巴菲特评分卡 [{total}/100] — {grade}
{score_card}

估值分析:
{val_text}

护城河评估:
{moat}

  免责声明:本分析仅供学习参考,不构成投资建议。
{'='*60}"""

    print(f"         报告生成完成")
    return {"final_report": report, "messages": [AIMessage(content=report)]}


# ============================================
# 条件路由函数
# ============================================
def check_parse_error(state: BuffettAnalysisState) -> str:
    if state.get("error"):
        return "has_error"
    return "no_error"


def check_fetch_error(state: BuffettAnalysisState) -> str:
    if state.get("error"):
        return "has_error"
    return "no_error"


# ============================================
# 构建 LangGraph 工作流
# ============================================
def build_buffett_graph():
    """
    构建巴菲特价值投资分析工作流

    START → parse_input → [error?] → fetch_data → [error?]
         → fundamental_analysis → moat_assessment → valuation_analysis
         → generate_report → END

    错误时直接跳到 generate_report 输出提示。
    """
    workflow = StateGraph(BuffettAnalysisState)

    # 添加节点
    workflow.add_node("parse_input", parse_input)
    workflow.add_node("fetch_data", fetch_data)
    workflow.add_node("fundamental_analysis", fundamental_analysis)
    workflow.add_node("moat_assessment", moat_assessment)
    workflow.add_node("valuation_analysis", valuation_analysis)
    workflow.add_node("generate_report", generate_report)

    # 入口
    workflow.add_edge(START, "parse_input")

    # parse_input 之后的条件路由
    workflow.add_conditional_edges(
        "parse_input",
        check_parse_error,
        {
            "has_error": "generate_report",
            "no_error": "fetch_data",
        }
    )

    # fetch_data 之后的条件路由
    workflow.add_conditional_edges(
        "fetch_data",
        check_fetch_error,
        {
            "has_error": "generate_report",
            "no_error": "fundamental_analysis",
        }
    )

    # 正常流程链
    workflow.add_edge("fundamental_analysis", "moat_assessment")
    workflow.add_edge("moat_assessment", "valuation_analysis")
    workflow.add_edge("valuation_analysis", "generate_report")
    workflow.add_edge("generate_report", END)

    return workflow.compile()


# ============================================
# CLI 交互主循环
# ============================================
def main():
    print(f"""
{'='*60}
  巴菲特价值投资分析系统
  "价格是你付出的,价值是你得到的。" — 沃伦·巴菲特
{'='*60}
  支持市场: 美股 | 港股 | A股(沪深)
  数据来源: Yahoo Finance 实时数据
  分析框架: 巴菲特/芒格价值投资十维评估体系

  使用示例:
    > 茅台
    > 帮我看看苹果公司值不值得投资
    > 腾讯
    > MSFT
    > 600519

  输入 "退出" 或 "exit" 结束程序
{'='*60}
""")

    graph = build_buffett_graph()

    while True:
        try:
            user_input = input("请输入您想分析的股票 > ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\n再见!记住巴菲特的话:\"别人贪婪时我恐惧,别人恐惧时我贪婪。\"")
            break

        if not user_input:
            continue

        if user_input.lower() in ("退出", "exit", "quit", "q"):
            print("\n再见!记住巴菲特的话:\"别人贪婪时我恐惧,别人恐惧时我贪婪。\"")
            break

        print(f"\n  正在进行价值投资分析,请稍候...\n")

        try:
            result = graph.invoke({
                "messages": [HumanMessage(content=user_input)],
                "user_query": user_input,
                "stock_ticker": "",
                "stock_name": "",
                "market": "",
                "raw_data": {},
                "fundamental_scores": {},
                "moat_assessment": "",
                "valuation_result": {},
                "final_report": "",
                "error": "",
            })

            print(result.get("final_report", "分析完成但未生成报告"))

        except Exception as e:
            print(f"\n  分析过程中出现异常: {e}")
            print(f"  请重试或更换股票代码。\n")

        print()  # 空行分隔


if __name__ == "__main__":
    main()

参考:
https://docs.langchain.com
https://docs.langchain.com/oss/python/langchain/quickstart
https://docs.langchain.com/langsmith/home

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐