各位技术同仁,下午好!

今天,我们齐聚一堂,共同探讨一个在AI时代背景下,对企业品牌声誉至关重要的议题:如何构建一个“AI展现预警系统”,在第一时间发现AI搜索结果中对品牌不利的言论。

随着大型语言模型(LLM)的飞速发展和普及,以ChatGPT、Bard、Copilot为代表的AI搜索引擎正在深刻改变用户获取信息的方式。它们不再仅仅是索引网页,而是通过理解、总结、生成内容,直接向用户呈现答案。这无疑为信息传播带来了前所未有的效率,但也为品牌管理带来了新的挑战。

传统意义上的舆情监控系统,侧重于监测社交媒体、新闻网站、论坛等平台。然而,当用户直接通过AI搜索引擎提问,而AI给出的回答中包含了对品牌的不利信息时,这种负面传播的隐蔽性、权威性和扩散速度都远超以往。一条由AI“权威”生成的负面评价,其杀伤力可能相当于数百条社交媒体评论。更重要的是,用户可能不会再去验证AI的回答,而是直接采信。

因此,我们迫切需要一套全新的预警机制,能够深入AI搜索结果的“内部”,理解其生成内容的语义,并及时识别出任何可能损害品牌声誉的言论。这正是我们今天的主题:构建一个智能化、自动化、高效的AI展现预警系统。

本次讲座,我将从一名编程专家的视角,为大家详细剖析这个系统的架构、核心模块的实现细节,并提供大量的代码示例,帮助大家将理论转化为实践。我们将覆盖从数据采集、预处理、智能分析到预警通知的整个流程。

一、AI搜索时代的品牌挑战与预警系统的必要性

1.1 传统舆情监控的局限性

传统的舆情监控系统通常依赖于关键词匹配、爬虫抓取固定网页、以及基于规则的情感分析。它们在面对AI搜索结果时面临以下挑战:

  • 动态生成性: AI搜索结果是动态生成的,每次查询甚至在不同上下文中都可能产生差异。这使得传统的固定URL抓取变得无效。
  • 语义复杂性: AI生成的文本往往高度概括和总结,其中可能包含隐晦的负面含义、讽刺或基于误解的偏见,而这些是简单关键词匹配难以捕捉的。
  • 黑盒性: 我们无法直接访问AI搜索引擎的内部数据或API(除非它们提供),只能通过模拟用户查询来获取结果。
  • 权威性影响: AI生成的回答在用户心中具有更高的“权威性”,一旦出现负面信息,其对品牌信任度的损害更为严重且难以挽回。

1.2 为什么需要“第一时间”发现

“第一时间”在这里是关键。负面信息,尤其是通过AI渠道传播的,具有极强的扩散性和固化性。一旦负面叙事形成并被AI反复生成和传播,纠正成本将呈指数级增长。

  • 快速响应: 及时发现能够为品牌争取宝贵的应对时间,无论是内部调查、澄清事实,还是进行公关干预。
  • 损害控制: 在负面信息扩散初期进行干预,可以有效阻止其进一步蔓延,将损害降到最低。
  • 品牌声誉维护: 持续监测和快速响应是维护品牌长期声誉的基石。

构建这样一个系统,不仅仅是技术挑战,更是品牌在数字时代生存和发展的战略需求。

二、系统架构总览:蓝图与核心组件

一个健壮的AI展现预警系统需要多个模块协同工作。我将它设计为一个可伸缩、模块化的架构,便于迭代和维护。

高层架构视图:

+---------------------+      +------------------------+      +--------------------------+
|  1. 数据采集模块    |      |  2. 数据预处理与知识库   |      |  3. AI情感分析与意图识别 |
|   (AI Search Crawler) |      |   (Text Cleaning, Embedding, RAG) |      |   (LLM, Prompt Engineering) |
+----------+----------+      +------------+-----------+      +------------+-------------+
           |                              |                              |
           v                              v                              v
+-----------------------------------------------------------------------------------------+
|                                  消息队列 (Message Queue)                               |
|                                (e.g., Kafka, AWS SQS)                                  |
+-----------------------------------------------------------------------------------------+
           |
           v
+--------------------------+      +------------------------+
|  4. 预警规则与通知模块   |----->|  5. 数据存储与可视化    |
|   (Rule Engine, Notifier) |      |   (Database, Dashboard) |
+----------+----------+      +------------------------+
           |
           v
+--------------------------+
|     通知渠道 (Email, SMS, |
|     Slack, Webhook)      |
+--------------------------+

核心组件介绍:

  • 数据采集模块: 负责模拟用户在AI搜索引擎上的行为,抓取AI生成的回答内容。
  • 数据预处理与知识库模块: 对抓取到的原始文本进行清洗、格式化,并构建品牌相关的知识库,为后续的AI分析提供上下文和事实依据。
  • AI情感分析与意图识别模块: 利用大型语言模型(LLM)对文本进行深入语义分析,判断其情感倾向(正面、中性、负面)及潜在意图(抱怨、诽谤、误导等)。
  • 消息队列: 作为解耦各个模块的中间件,确保数据流的稳定性和系统的可伸缩性。
  • 预警规则与通知模块: 根据预设的规则(如情感阈值、特定关键词、意图匹配),触发预警,并通过多种渠道通知相关人员。
  • 数据存储与可视化: 存储所有抓取和分析的数据,并提供仪表盘进行趋势监控和历史追溯。

技术栈选择(示例):

  • 编程语言: Python (生态丰富,适合NLP/ML)
  • 爬虫框架: Playwright/Selenium (处理动态JS加载页面)
  • 文本处理: NLTK, spaCy, RegEx
  • 向量数据库: Pinecone, Weaviate, Milvus, Qdrant (用于RAG和知识库)
  • LLM提供商: OpenAI API (GPT-4), Anthropic Claude, Google Gemini
  • 消息队列: Apache Kafka / AWS SQS / RabbitMQ
  • 数据存储: PostgreSQL / MongoDB (结构化/非结构化数据)
  • 通知服务: SMTP (Email), Twilio (SMS), Slack Webhooks, DingTalk/WeChat Work APIs
  • 云平台: AWS / Google Cloud Platform / Azure (提供Serverless functions, Managed databases, etc.)

接下来,我们将深入探讨每个核心模块的实现细节和代码示例。

三、核心模块详解与代码实现

3.1 数据采集模块:深入AI搜索结果

数据采集是整个系统的起点。由于AI搜索结果的动态性和黑盒性,我们不能像抓取静态网页那样简单。

挑战:

  • 反爬机制: AI搜索引擎可能部署了复杂的反爬机制,包括IP限制、User-Agent检测、JS混淆等。
  • 动态渲染: 结果页面通常由JavaScript动态渲染,需要一个能够执行JS的浏览器自动化工具。
  • 会话管理: 模拟用户登录、保持会话等复杂操作。
  • 速率限制: 过快请求可能导致IP被封禁或触发验证码。

策略:

  • 浏览器自动化工具: 使用Playwright或Selenium模拟真实用户操作,打开浏览器,输入查询,等待结果加载。
  • 代理IP池: 轮换IP地址,避免被目标网站识别为爬虫。
  • User-Agent轮换: 模拟不同浏览器和操作系统。
  • 请求间隔: 引入随机延迟,模拟人类浏览行为。
  • 错误处理与重试: 对网络错误、页面加载失败等情况进行健壮处理。

实现:Python + Playwright

Playwright是一个强大的浏览器自动化库,支持Chromium、Firefox和WebKit,并且提供了同步和异步API。它特别适合处理JavaScript渲染的页面。

import asyncio
from playwright.async_api import async_playwright, Page, TimeoutError
import logging
import random
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 模拟的AI搜索引擎URL和搜索框选择器 (示例,实际需根据目标AI搜索引擎调整)
# 注意:以下URL和选择器均为虚构,实际使用时请替换为真实AI搜索引擎的URL和元素选择器
AI_SEARCH_URL = "https://example-ai-search.com/"
SEARCH_INPUT_SELECTOR = "input[name='q']"
SEARCH_RESULT_SELECTOR = "div.ai-generated-answer" # AI生成回答的CSS选择器

# 代理IP池 (示例,实际应从服务商获取或自建)
PROXY_LIST = [
    "http://user:[email protected]:8080",
    "http://user:[email protected]:8080",
    # ... 更多代理
]

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/109.0.0.0 Safari/537.36",
    # ... 更多User-Agent
]

async def get_ai_search_result(query: str) -> str | None:
    """
    模拟在AI搜索引擎中进行搜索,并提取AI生成的回答。
    """
    selected_proxy = random.choice(PROXY_LIST) if PROXY_LIST else None
    selected_user_agent = random.choice(USER_AGENTS)

    browser_config = {
        'headless': True,  # 生产环境通常使用无头模式
        'proxy': {'server': selected_proxy} if selected_proxy else None
    }

    async with async_playwright() as p:
        try:
            browser = await p.chromium.launch(**browser_config)
            context = await browser.new_context(user_agent=selected_user_agent)
            page = await context.new_page()

            logging.info(f"Navigating to {AI_SEARCH_URL} with proxy: {selected_proxy} and UA: {selected_user_agent[:50]}...")
            await page.goto(AI_SEARCH_URL, wait_until='domcontentloaded', timeout=60000)

            # 确保搜索框可见并可交互
            await page.wait_for_selector(SEARCH_INPUT_SELECTOR, state='visible', timeout=30000)
            await page.fill(SEARCH_INPUT_SELECTOR, query)
            await page.press(SEARCH_INPUT_SELECTOR, "Enter")

            logging.info(f"Searching for: '{query}'")

            # 等待AI生成结果的元素出现
            # 通常AI结果加载需要时间,这里设置较长的等待时间
            await page.wait_for_selector(SEARCH_RESULT_SELECTOR, state='visible', timeout=90000)

            ai_answer_element = await page.query_selector(SEARCH_RESULT_SELECTOR)
            if ai_answer_element:
                ai_answer_text = await ai_answer_element.inner_text()
                logging.info(f"Successfully retrieved AI answer for '{query}'. Length: {len(ai_answer_text)} chars.")
                return ai_answer_text
            else:
                logging.warning(f"Could not find AI answer element for '{query}'. Selector: {SEARCH_RESULT_SELECTOR}")
                return None

        except TimeoutError:
            logging.error(f"Timeout occurred while searching for '{query}'. Page did not load or element not found.")
            return None
        except Exception as e:
            logging.error(f"An error occurred during AI search for '{query}': {e}")
            return None
        finally:
            if 'browser' in locals():
                await browser.close()
            # 引入随机延迟,模拟人类行为,避免触发反爬
            time.sleep(random.uniform(5, 15))

# 示例使用
async def main():
    brand_queries = [
        "关于 [你的品牌名] 的最新评价",
        "是 [你的品牌名] 的产品好用吗",
        "[你的品牌名] 负面新闻",
        "[你的品牌名] 质量问题",
        "如何评价 [你的品牌名]",
        "谁是 [你的品牌名] 的竞争对手",
        "[你的品牌名] 的服务怎么样",
    ]

    results = {}
    for query in brand_queries:
        ai_response = await get_ai_search_result(query)
        if ai_response:
            results[query] = ai_response
            # 这里可以将结果发送到消息队列供下一个模块处理
            print(f"Query: {query}nAI Response:n{ai_response[:200]}...n---")
        else:
            print(f"Query: {query}nNo AI Response found or error occurred.n---")

        # 避免过于频繁的请求
        await asyncio.sleep(random.uniform(10, 30)) 

    # 实际应用中,会将results发送到消息队列
    # 例如:send_to_message_queue(results)

if __name__ == "__main__":
    asyncio.run(main())

代码说明:

  • get_ai_search_result 函数封装了整个爬取逻辑。
  • 通过 async_playwright() 启动无头浏览器,并设置代理和User-Agent。
  • page.goto() 导航到AI搜索引擎页面。
  • page.fill() 填充搜索框,page.press("Enter") 提交查询。
  • page.wait_for_selector() 等待AI生成结果的特定HTML元素加载完成。
  • ai_answer_element.inner_text() 提取AI回答的纯文本内容。
  • 包含了错误处理、超时机制和随机延迟,增强爬虫的鲁棒性。
  • main 函数展示了如何批量查询与品牌相关的疑问。

重要提示: 请务必遵守目标网站的服务条款和robots.txt协议。未经授权的爬取可能导致法律风险或IP被封禁。上述代码中的URL和选择器仅为演示目的,实际使用时必须替换为真实的AI搜索引擎及其对应的HTML元素。某些AI搜索可能集成验证码或更复杂的反爬机制,可能需要额外的解决方案(如接入打码平台)。

3.2 数据预处理与知识库构建模块

从AI搜索结果中获取的原始文本,通常需要清洗才能进行高质量的LLM分析。同时,构建一个品牌知识库对于提高分析的准确性和避免LLM“幻觉”至关重要。

目的:

  • 文本清洗: 移除无关字符、HTML标签、乱码等。
  • 文本标准化: 统一格式,如转换为小写,处理数字和日期。
  • 语义增强: 通过分词、命名实体识别(NER)等技术提取关键信息。
  • 知识库构建: 存储品牌的官方信息、产品特点、常见问题解答、正面宣传材料,用于RAG(检索增强生成)机制。

实现:Python + NLTK/spaCy + OpenAI Embedding + Vector DB

3.2.1 文本清洗与分块

LLM通常有输入长度限制(context window),因此长文本需要分块(chunking)。

import re
import jieba # 假设处理中文,可以使用jieba进行分词
from typing import List, Dict

def clean_text(text: str) -> str:
    """
    清洗文本,移除HTML标签、多余空格、特殊字符等。
    """
    if not isinstance(text, str):
        return ""
    # 移除HTML标签
    clean = re.compile('<.*?>')
    text = re.sub(clean, '', text)
    # 移除URLs
    text = re.sub(r'httpS+|wwwS+|httpsS+', '', text, flags=re.MULTILINE)
    # 移除邮件地址
    text = re.sub(r'S*@S*s?', '', text)
    # 移除特殊字符和数字,只保留中英文、数字和常见标点
    text = re.sub(r'[^u4e00-u9fa5a-zA-Z0-9.,!?;,。!?;]', ' ', text)
    # 移除多个空格
    text = re.sub(r's+', ' ', text).strip()
    return text

def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
    """
    将长文本分割成固定大小的块,并带有重叠。
    """
    if not text:
        return []

    tokens = list(jieba.cut(text)) # 假设中文,使用jieba分词
    # 如果是英文,可以使用 text.split() 或更高级的tokenizers

    chunks = []
    i = 0
    while i < len(tokens):
        chunk = tokens[i:i + chunk_size]
        chunks.append("".join(chunk))
        i += chunk_size - overlap
        if i >= len(tokens): # 确保最后一个chunk也包含
            break
    return chunks

# 示例使用
raw_ai_response = """
<p>关于<a href="https://yourbrand.com">你的品牌名</a>的最新评价:</p>
<p>最近用户反映其产品A的续航能力不及宣传,导致部分消费者不满。虽然官方宣称有<b>30小时</b>续航,但实际使用中可能只有20小时。此外,客服响应速度也有待提高。</p>
<p>但也有用户表示,产品B的性价比很高,是同类产品中的佼佼者。其创新设计和稳定性能获得了广泛好评。总体而言,该品牌在市场上有一定的竞争力。</p>
"""
cleaned_text = clean_text(raw_ai_response)
print(f"Cleaned Text:n{cleaned_text}n")

chunks = chunk_text(cleaned_text, chunk_size=100, overlap=20)
for i, chunk in enumerate(chunks):
    print(f"Chunk {i+1}:n{chunk}n---")
3.2.2 知识库构建与向量化

品牌知识库应包含官方文档、FAQ、产品手册等。这些文本经过清洗和分块后,需要通过Embedding模型转换为向量,存储在向量数据库中。

向量数据库的优势:

特性 传统关系型数据库(RDB) 向量数据库(Vector DB)
数据类型 结构化数据,如整数、字符串、日期 高维向量(浮点数数组)
查询方式 精确匹配、范围查询、JOIN 近似最近邻(ANN)搜索,相似度查询
核心用例 事务处理、报表、数据分析 语义搜索、推荐系统、RAG、异常检测
扩展性 通常通过分库分表或读写分离 针对高维向量搜索优化,水平扩展能力强
代表产品 MySQL, PostgreSQL, Oracle Pinecone, Weaviate, Milvus, Qdrant, Chroma

我们将使用OpenAI的Embedding API将文本转换为向量。

import os
from openai import OpenAI
from typing import List, Dict

# 假设您已设置OPENAI_API_KEY环境变量
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

def get_embeddings(texts: List[str]) -> List[List[float]]:
    """
    使用OpenAI Embedding API将文本转换为向量。
    """
    if not texts:
        return []
    try:
        response = client.embeddings.create(
            input=texts,
            model="text-embedding-ada-002" # 当前推荐的embedding模型
        )
        return [data.embedding for data in response.data]
    except Exception as e:
        logging.error(f"Error getting embeddings: {e}")
        return []

# 模拟品牌知识库内容
brand_knowledge_base_raw = [
    "我们的品牌名是'你的品牌名',致力于提供高质量的智能家居产品。",
    "产品A是一款智能音箱,拥有30小时超长续航,支持语音助手功能。",
    "产品B是一款智能摄像头,特点是1080P高清画质和AI人形识别。",
    "我们的客服团队24/7在线,承诺2小时内响应客户咨询。",
    "所有产品均享受一年质保和七天无理由退换。",
    # ... 更多官方、正面的品牌信息
]

# 清洗并分块知识库内容
cleaned_knowledge_chunks = []
for doc in brand_knowledge_base_raw:
    cleaned_doc = clean_text(doc)
    cleaned_knowledge_chunks.extend(chunk_text(cleaned_doc, chunk_size=100, overlap=0))

# 获取知识库文本的向量
knowledge_embeddings = get_embeddings(cleaned_knowledge_chunks)

# 模拟存储到向量数据库 (这里仅打印,实际需调用向量数据库SDK)
# 假设我们有一个简单的结构来存储
knowledge_vectors: List[Dict] = []
for i, (text_chunk, embedding) in enumerate(zip(cleaned_knowledge_chunks, knowledge_embeddings)):
    knowledge_vectors.append({
        "id": f"kb_chunk_{i}",
        "text": text_chunk,
        "embedding": embedding
    })
    # print(f"Knowledge Chunk {i+1}: {text_chunk[:50]}... Embedding length: {len(embedding)}")

# 示例:将知识库存储到Pinecone (概念性代码)
# from pinecone import Pinecone, Index
# pinecone_api_key = os.environ.get("PINECONE_API_KEY")
# pinecone_environment = os.environ.get("PINECONE_ENVIRONMENT")
# pc = Pinecone(api_key=pinecone_api_key, environment=pinecone_environment)
# index_name = "brand-knowledge-base"

# if index_name not in pc.list_indexes():
#     pc.create_index(name=index_name, dimension=len(knowledge_embeddings[0]), metric='cosine')

# index: Index = pc.Index(index_name)
# index.upsert(vectors=[
#     {"id": item["id"], "values": item["embedding"], "metadata": {"text": item["text"]}}
#     for item in knowledge_vectors
# ])
# logging.info(f"Knowledge base loaded with {len(knowledge_vectors)} chunks into Pinecone.")

代码说明:

  • clean_text 函数用于移除HTML、URL、特殊字符等。
  • chunk_text 函数将长文本按字数(或token数)分割成小块,并引入重叠以保留上下文。
  • get_embeddings 函数通过OpenAI API将文本块转换为高维向量。
  • 展示了如何将清洗、分块和向量化的知识库内容概念性地存储到向量数据库中。实际使用时,需要安装并配置相应的向量数据库客户端(如pinecone-client)。

3.3 AI情感分析与意图识别模块:LLM的深度洞察

这是预警系统的核心,利用LLM的强大理解能力,超越简单的关键词匹配,进行深层次的语义分析。

挑战:

  • 识别细微情感: 负面情绪可能不直接表达,而是通过暗示、比较、讽刺等方式体现。
  • 区分事实与观点: LLM可能混淆用户观点和客观事实。
  • 幻觉问题: LLM可能生成不准确甚至虚假的信息。
  • 上下文理解: 准确判断负面言论需要结合品牌自身的背景信息。

策略:

  • Prompt Engineering: 精心设计Prompt,引导LLM输出结构化、准确的结果。
  • Few-shot Learning: 在Prompt中提供少量负面言论示例及其分析结果,帮助LLM理解任务。
  • RAG (Retrieval Augmented Generation): 结合品牌知识库,为LLM提供额外上下文,进行事实核查,减少幻觉,并帮助LLM更好地判断言论是否与品牌官方信息相悖。
  • 结构化输出: 要求LLM以JSON格式返回结果,便于后续程序处理。

实现:Python + OpenAI API + RAG

import os
from openai import OpenAI
import json
import logging
from typing import Dict, List, Any

client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

# 模拟向量数据库查询函数 (实际应调用Pinecone/Weaviate等SDK)
def retrieve_relevant_knowledge(query_text: str, top_k: int = 3) -> List[str]:
    """
    根据查询文本,从向量数据库中检索最相关的品牌知识。
    这里仅为模拟,实际需实现向量搜索逻辑。
    """
    # 实际步骤:
    # 1. 将 query_text 转换为 embedding
    # query_embedding = get_embeddings([query_text])[0]
    # 2. 调用向量数据库进行相似度搜索
    # search_results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True)
    # 3. 提取相关文本
    # relevant_texts = [match.metadata['text'] for match in search_results.matches]

    # 简单模拟返回,实际应从知识库中检索
    simulated_knowledge = [
        "我们的产品A确实拥有30小时的超长续航能力,这是经过严格测试的官方数据。",
        "我们承诺2小时内响应客户咨询,并持续优化客服系统以提供更快的响应。",
        "产品B的1080P高清画质和AI人形识别是其核心卖点,深受用户喜爱。"
    ]

    # 根据查询文本的关键词简单筛选模拟结果,实际是向量搜索
    relevant = [k for k in simulated_knowledge if any(word in query_text for word in ["续航", "客服", "产品B"])]
    return relevant if relevant else simulated_knowledge[:top_k] # 如果没有匹配,返回前top_k个

def analyze_brand_mention_with_llm(
    mention_text: str, 
    brand_name: str, 
    context_knowledge: List[str] = None
) -> Dict[str, Any] | None:
    """
    使用LLM分析品牌提及的情感和意图,并结合知识库进行事实核查。
    """
    if context_knowledge is None:
        context_knowledge = []

    knowledge_str = "n".join([f"- {k}" for k in context_knowledge])
    if knowledge_str:
        knowledge_str = "nn以下是关于品牌的一些官方信息和背景知识,请在分析时作为参考,用于核实事实或提供上下文:n" + knowledge_str

    prompt = f"""
你是一个专业的品牌舆情分析师,你的任务是分析一段关于品牌'{brand_name}'的言论。
请根据以下要求,以JSON格式输出分析结果:

1.  **sentiment (情感倾向):** 'Positive', 'Neutral', 'Negative', 'Mixed'。
2.  **intent (意图):** 识别言论的潜在意图,例如 'Complaint' (抱怨), 'Misinformation' (虚假信息/误导), 'Defamation' (诽谤), 'Bug Report' (Bug报告), 'Feature Request' (功能请求), 'General Review' (一般评价), 'Praise' (赞扬), 'Comparison' (比较), 'Question' (提问) 等。如果存在多个意图,请列出主要意图。
3.  **keywords (关键词):** 提取与品牌、产品、负面/正面信息直接相关的关键词列表。
4.  **summary (总结):** 对该言论进行简明扼要的总结。
5.  **is_negative_impact (是否产生负面影响):** 布尔值,判断该言论是否可能对品牌产生负面影响。如果情感是Negative或Mixed,且意图是Complaint, Misinformation, Defamation,则通常为True。
6.  **details (详细分析):** 解释为什么你认为它会产生负面影响,或者为什么是某种情感和意图。如果言论内容与提供的品牌知识存在矛盾,请指出矛盾点。
7.  **confidence (置信度):** 你对分析结果的置信度,范围0-100。

请分析以下言论:

---
言论内容:
{mention_text}
---
{knowledge_str}

请直接输出JSON格式的结果,不要包含任何额外文字或解释。
"""

    try:
        response = client.chat.completions.create(
            model="gpt-4o", # 使用最新的模型,性能更优
            messages=[
                {"role": "system", "content": "You are a helpful assistant designed to output JSON."},
                {"role": "user", "content": prompt}
            ],
            response_format={"type": "json_object"}, # 强制LLM输出JSON
            temperature=0.2 # 较低的温度值有助于获得更稳定、一致的输出
        )
        result_json_str = response.choices[0].message.content
        return json.loads(result_json_str)
    except json.JSONDecodeError as e:
        logging.error(f"LLM output was not valid JSON: {result_json_str}. Error: {e}")
        return None
    except Exception as e:
        logging.error(f"Error analyzing mention with LLM: {e}")
        return None

# 示例使用
ai_search_query_result = """
最近用户反映'你的品牌名'的产品A续航能力不及宣传,导致部分消费者不满。虽然官方宣称有30小时续航,但实际使用中可能只有20小时。此外,客服响应速度也有待提高。
"""

# 结合RAG
relevant_knowledge = retrieve_relevant_knowledge(ai_search_query_result)
print(f"Retrieved relevant knowledge: {relevant_knowledge}n")

analysis_result = analyze_brand_mention_with_llm(
    mention_text=ai_search_query_result,
    brand_name="你的品牌名",
    context_knowledge=relevant_knowledge
)

if analysis_result:
    print(json.dumps(analysis_result, indent=2, ensure_ascii=False))
else:
    print("Failed to get LLM analysis.")

# 另一个例子:正面评价
positive_query_result = """
'你的品牌名'的产品B性价比很高,是同类产品中的佼佼者。其创新设计和稳定性能获得了广泛好评。
"""
positive_analysis_result = analyze_brand_mention_with_llm(
    mention_text=positive_query_result,
    brand_name="你的品牌名"
)
if positive_analysis_result:
    print("n--- Positive Example ---n")
    print(json.dumps(positive_analysis_result, indent=2, ensure_ascii=False))

代码说明:

  • retrieve_relevant_knowledge 函数模拟从向量数据库中检索与当前待分析文本最相关的品牌知识。这是RAG机制的关键一步。
  • analyze_brand_mention_with_llm 函数是核心。
    • Prompt Engineering: 构建了详细的Prompt,明确角色、任务、期望的JSON输出格式,并列出了可能的情感和意图类型。
    • RAG集成:context_knowledge作为Prompt的一部分,为LLM提供额外的背景信息,帮助其进行更准确的分析和事实核查。
    • 强制JSON输出: 利用response_format={"type": "json_object"}确保LLM返回有效的JSON字符串。
    • 使用gpt-4o模型,因为它在理解复杂指令和生成结构化输出方面表现出色。
  • 通过json.loads()解析LLM的输出,以便程序化处理。

3.4 预警规则与通知模块

当AI情感分析与意图识别模块得出结论后,预警规则模块会根据这些结果判断是否需要触发预警。

目的:

  • 规则评估: 根据预设条件(情感、意图、关键词、置信度等)判断是否达到预警级别。
  • 通知分发: 通过多种渠道及时通知相关负责人。

预警规则示例(可配置):

规则ID 规则名称 触发条件 预警级别 通知渠道 接收人/组
R001 严重负面评价 sentiment == ‘Negative’ 且 is_negative_impact == True 且 confidence >= 80 紧急 邮件, Slack 品牌公关部, CEO
R002 虚假信息/诽谤 intent 包含 ‘Misinformation’ 或 ‘Defamation’ 且 confidence >= 70 紧急 邮件, 短信 品牌公关部, 法务部
R003 批量负面抱怨 在短时间内(如1小时内)发现5条以上 sentiment == ‘Negative’ 且 intent == ‘Complaint’ 的言论 邮件, Slack 产品经理, 客户服务部
R004 特定关键词预警 keywords 包含 ‘质量问题’, ‘诈骗’, ‘退款难’ 等敏感词 邮件, 钉钉 相关产品负责人, 客户服务部
R005 中度负面评价 sentiment == ‘Negative’ 或 ‘Mixed’ 且 is_negative_impact == True 邮件 品牌公关部

实现:Python + 消息队列 + 通知服务

消息队列在这里起到关键作用,它解耦了分析模块和通知模块,提高了系统的鲁棒性和可伸缩性。分析结果首先发送到消息队列,然后由通知服务异步消费。

import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import json
import logging
from typing import Dict, Any, List

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 消息队列模拟 (实际应集成Kafka/SQS等) ---
class MessageQueue:
    def __init__(self):
        self.queue = []

    def publish(self, topic: str, message: Dict[str, Any]):
        logging.info(f"Published to topic '{topic}': {message.get('alert_id', 'N/A')}")
        self.queue.append({'topic': topic, 'message': message})

    def consume(self, topic: str) -> List[Dict[str, Any]]:
        # 实际消费会有更复杂的机制,这里简单模拟
        consumed_messages = [item['message'] for item in self.queue if item['topic'] == topic]
        self.queue = [item for item in self.queue if item['topic'] != topic] # 消费后移除
        return consumed_messages

message_queue = MessageQueue()

# --- 通知服务实现 ---

def send_email_notification(recipient_email: str, subject: str, body: str):
    """发送邮件通知"""
    sender_email = os.environ.get("SENDER_EMAIL")
    sender_password = os.environ.get("SENDER_PASSWORD")
    smtp_server = os.environ.get("SMTP_SERVER", "smtp.gmail.com")
    smtp_port = int(os.environ.get("SMTP_PORT", 587))

    if not all([sender_email, sender_password, recipient_email]):
        logging.error("Email configuration missing. Skipping email notification.")
        return False

    try:
        msg = MIMEMultipart()
        msg['From'] = sender_email
        msg['To'] = recipient_email
        msg['Subject'] = subject
        msg.attach(MIMEText(body, 'plain', 'utf-8'))

        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(sender_email, sender_password)
            server.send_message(msg)
        logging.info(f"Email notification sent to {recipient_email} for subject: {subject}")
        return True
    except Exception as e:
        logging.error(f"Failed to send email to {recipient_email}: {e}")
        return False

def send_slack_notification(webhook_url: str, message: str):
    """发送Slack通知"""
    if not webhook_url:
        logging.error("Slack webhook URL not configured. Skipping Slack notification.")
        return False

    payload = {'text': message}
    try:
        response = requests.post(webhook_url, json=payload, timeout=10)
        response.raise_for_status()
        logging.info(f"Slack notification sent. Status: {response.status_code}")
        return True
    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to send Slack notification: {e}")
        return False

# 假设还有send_sms_notification, send_dingtalk_notification等

# --- 预警规则评估器 ---
def evaluate_alert_rules(analysis_result: Dict[str, Any], brand_name: str) -> List[Dict[str, Any]]:
    """
    根据LLM的分析结果评估预警规则。
    返回触发的预警列表。
    """
    triggered_alerts = []

    sentiment = analysis_result.get('sentiment')
    intent = analysis_result.get('intent', [])
    is_negative_impact = analysis_result.get('is_negative_impact', False)
    confidence = analysis_result.get('confidence', 0)
    keywords = analysis_result.get('keywords', [])

    mention_summary = analysis_result.get('summary', analysis_result.get('details', '无总结'))
    original_text = analysis_result.get('original_text', 'N/A') # 假设LLM结果中包含原始文本

    # 规则1: 严重负面评价
    if sentiment == 'Negative' and is_negative_impact and confidence >= 80:
        triggered_alerts.append({
            "alert_id": "R001",
            "level": "紧急",
            "title": f"【紧急预警】品牌'{brand_name}'出现严重负面评价",
            "message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n情感: {sentiment}, 意图: {intent}, 置信度: {confidence}",
            "channels": ["email", "slack"],
            "recipients": {"email": ["[email protected]", "[email protected]"], "slack": ["pr_channel_webhook_url"]}
        })

    # 规则2: 虚假信息/诽谤
    if any(i in intent for i in ['Misinformation', 'Defamation']) and confidence >= 70:
        triggered_alerts.append({
            "alert_id": "R002",
            "level": "紧急",
            "title": f"【紧急预警】品牌'{brand_name}'可能存在虚假信息或诽谤",
            "message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n情感: {sentiment}, 意图: {intent}, 置信度: {confidence}n详细分析: {analysis_result.get('details', '')}",
            "channels": ["email", "sms"],
            "recipients": {"email": ["[email protected]", "[email protected]"], "sms": ["+8613800138000"]}
        })

    # 规则3: 特定敏感关键词
    sensitive_keywords = ['质量问题', '诈骗', '退款难', '虚假宣传']
    if any(k in sensitive_keywords for k in keywords):
        triggered_alerts.append({
            "alert_id": "R004",
            "level": "高",
            "title": f"【高风险预警】品牌'{brand_name}'出现敏感关键词",
            "message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n涉及关键词: {', '.join(keywords)}n情感: {sentiment}, 意图: {intent}",
            "channels": ["email", "dingtalk"],
            "recipients": {"email": ["[email protected]"], "dingtalk": ["dingtalk_webhook_url"]}
        })

    # 规则4: 中度负面评价
    if (sentiment == 'Negative' or sentiment == 'Mixed') and is_negative_impact and not triggered_alerts: # 如果没有被更高级别的规则捕获
        triggered_alerts.append({
            "alert_id": "R005",
            "level": "中",
            "title": f"【中度预警】品牌'{brand_name}'出现负面评价",
            "message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n情感: {sentiment}, 意图: {intent}",
            "channels": ["email"],
            "recipients": {"email": ["[email protected]"]}
        })

    return triggered_alerts

def process_alert_message(alert_message: Dict[str, Any]):
    """
    处理单个预警消息,并发送到指定渠道。
    """
    alert_title = alert_message['title']
    alert_body = alert_message['message']
    channels = alert_message['channels']
    recipients = alert_message['recipients']

    if "email" in channels and "email" in recipients:
        for email in recipients["email"]:
            send_email_notification(email, alert_title, alert_body)

    if "slack" in channels and "slack" in recipients:
        for webhook_url in recipients["slack"]:
            send_slack_notification(webhook_url, f"*{alert_title}*n{alert_body}")

    # ... 其他通知渠道,如短信、钉钉等

# --- 主流程集成示例 ---
async def integrated_workflow(query_text: str, brand_name: str):
    # 1. 数据采集
    ai_response_text = await get_ai_search_result(query_text)
    if not ai_response_text:
        logging.warning(f"No AI response for query: {query_text}")
        return

    # 2. 数据预处理
    cleaned_text = clean_text(ai_response_text)

    # 3. RAG - 检索相关知识
    relevant_knowledge = retrieve_relevant_knowledge(cleaned_text)

    # 4. AI情感分析与意图识别
    analysis_result = analyze_brand_mention_with_llm(
        mention_text=cleaned_text,
        brand_name=brand_name,
        context_knowledge=relevant_knowledge
    )

    if not analysis_result:
        logging.error(f"Failed to get analysis for: {cleaned_text[:100]}...")
        return

    # 将原始文本也加入分析结果,方便追溯
    analysis_result['original_text'] = cleaned_text 

    logging.info(f"LLM Analysis Result: {json.dumps(analysis_result, ensure_ascii=False)}")

    # 5. 预警规则评估
    triggered_alerts = evaluate_alert_rules(analysis_result, brand_name)

    # 6. 发布到消息队列
    if triggered_alerts:
        for alert in triggered_alerts:
            message_queue.publish("brand_alerts", alert)
    else:
        logging.info("No alerts triggered for this mention.")

    # 7. 模拟消息队列消费和通知发送 (在实际系统中,这通常是另一个独立的消费者服务)
    consumed_alerts = message_queue.consume("brand_alerts")
    for alert in consumed_alerts:
        logging.info(f"Consuming alert: {alert['title']}")
        process_alert_message(alert)

# 运行集成示例
async def run_integration_example():
    test_brand_name = "你的品牌名"
    test_queries = [
        "关于 你的品牌名 的产品A续航太差了",
        "有人说 你的品牌名 的客服态度不好",
        "对 你的品牌名 的产品B非常满意,强烈推荐",
        "你的品牌名 真的有质量问题吗?网上都在传"
    ]

    for q in test_queries:
        print(f"n--- Processing Query: {q} ---")
        await integrated_workflow(q, test_brand_name)
        await asyncio.sleep(random.uniform(5, 10)) # 避免过于频繁请求

if __name__ == "__main__":
    # 配置环境变量 (请替换为您的实际值)
    os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
    os.environ["SENDER_EMAIL"] = "[email protected]"
    os.environ["SENDER_PASSWORD"] = "your_email_app_password" # 或SMTP密码
    # os.environ["SMTP_SERVER"] = "smtp.office365.com"
    # os.environ["SMTP_PORT"] = "587"
    # os.environ["SLACK_WEBHOOK_URL"] = "YOUR_SLACK_WEBHOOK_URL"
    # os.environ["DINGTALK_WEBHOOK_URL"] = "YOUR_DINGTALK_WEBHOOK_URL"

    asyncio.run(run_integration_example())

代码说明:

  • 消息队列模拟: MessageQueue 类简单模拟了消息的发布和消费,实际生产环境应使用Kafka、AWS SQS等专业消息队列服务。
  • 通知服务: send_email_notificationsend_slack_notification 演示了如何通过SMTP和HTTP Webhook发送通知。这些函数需要配置相应的环境变量(如邮箱账号密码、Webhook URL)。
  • evaluate_alert_rules 函数: 这是预警逻辑的核心。它接收LLM的分析结果,并根据预设的规则列表进行匹配。每条规则定义了触发条件(如情感、意图、置信度、关键词)和触发后的预警级别、标题、消息内容以及通知渠道和接收人。
  • process_alert_message 函数: 负责根据预警消息中指定的渠道,调用相应的通知发送函数。
  • integrated_workflow 函数: 将之前所有模块串联起来,展示了从数据采集到最终预警通知的完整流程。
  • 环境变量: 所有的API密钥、Webhook URL等敏感信息都通过环境变量加载,增强安全性。

四、系统部署与运维考虑

一个实用的预警系统不仅要能工作,还要能稳定、高效、安全地运行。

4.1 可伸缩性(Scalability)

  • 数据采集: 采用分布式爬虫架构。多个爬虫实例并行工作,配合代理IP池和User-Agent轮换,提高抓取效率和抗封禁能力。可以部署在Kubernetes集群或Serverless函数(如AWS Lambda, Google Cloud Functions)上。
  • 消息队列: 使用Kafka、AWS SQS等托管服务,轻松处理高并发数据流,解耦各模块。
  • LLM分析: LLM API本身具有高可用性和可伸缩性。在本地部署开源LLM时,考虑使用GPU集群。
  • 向量数据库: 选择云托管的向量数据库服务(如Pinecone, Weaviate Cloud),它们通常自带高可用和弹性伸缩能力。
  • 通知服务: 同样可以利用Serverless函数处理通知逻辑,按需伸缩。

4.2 可靠性(Reliability)

  • 监控与日志: 部署全面的监控系统(如Prometheus + Grafana)来跟踪系统各项指标(CPU、内存、请求速率、错误率、LLM调用次数、预警触发次数)。详细记录所有模块的日志(如ELK Stack或Loki),便于问题排查。
  • 错误处理与重试: 在每个模块中实现健壮的错误处理机制,例如网络请求失败时的指数退避重试、LLM调用超时重试等。
  • 数据持久化: 关键数据(原始AI结果、分析结果、触发的预警)应持久化存储到数据库,以便审计、追溯和后续分析。
  • 幂等性设计: 消息队列消费者应设计为幂等性,即多次处理同一条消息不会产生副作用,避免重复通知或数据错误。

4.3 安全性(Security)

  • API密钥管理: 绝不将API密钥硬编码到代码中。使用环境变量、Secret Manager (如AWS Secrets Manager, Azure Key Vault) 或HashiCorp Vault等工具进行安全管理。
  • 数据加密: 传输中的数据(TLS/SSL)和静态数据(数据库加密)都应进行加密。
  • 访问控制: 对系统各个组件(数据库、云资源、API端点)实施最小权限原则,限制访问。
  • 代理IP安全: 确保代理IP提供商的信誉,避免使用不安全的代理。

4.4 成本优化(Cost Optimization)

  • LLM调用成本: LLM API调用是主要的成本来源。
    • 模型选择: 根据任务复杂度选择合适的模型,不一定所有任务都需要GPT-4o。对简单任务使用更便宜的模型。
    • Prompt优化: 精简Prompt,减少输入Token数量。
    • 批处理: 尽可能批处理LLM请求,减少API调用的次数。
    • 缓存: 对重复的查询或分析结果进行缓存。
  • 向量数据库成本: 根据数据量和查询频率选择合适的向量数据库实例大小和类型。
  • 计算资源成本: 利用Serverless计算(Lambda, Cloud Functions)实现按需付费,避免资源闲置浪费。
  • 爬虫成本: 代理IP服务通常按流量或请求量收费,合理规划和选择服务商。

4.5 持续优化(Continuous Optimization)

  • Prompt迭代: LLM的性能高度依赖于Prompt。需要持续测试和优化Prompt,提高分析准确性。
  • 模型微调: 如果有大量的特定领域标注数据,可以考虑对开源LLM进行微调,以提高特定任务的性能并降低成本。
  • 规则更新: 预警规则需要根据品牌策略、市场变化和实际效果进行动态调整。
  • 反馈闭环: 收集预警系统的误报和漏报数据,形成反馈闭环,持续改进LLM分析和预警规则。

五、挑战与未来展望

5.1 当前挑战

  • AI搜索结果的黑盒性与变化性: 目标AI搜索引擎可能随时更新其UI、反爬机制或底层模型,导致爬虫失效或分析结果不稳定。
  • LLM的幻觉与偏见: 尽管RAG能有效缓解,但LLM仍可能生成不完全准确或带有偏见的回答,需要人工复核。
  • 多语言支持的复杂性: 品牌可能面临多语言市场,需要针对不同语言配置不同的分词器、Embedding模型和LLM Prompt。
  • 成本控制: 大规模部署和高频调用LLM可能产生不小的费用,需要精细化管理和优化。
  • 误报与漏报: 这是一个持续的挑战,需要通过反馈循环和迭代优化来逐步改善。

5.2 未来展望

  • 更高级的意图识别与情绪粒度: 发展更精细的LLM模型,不仅能识别基本情感,还能区分情绪强度、识别讽刺、暗示等复杂语境,甚至理解用户的情绪演变。
  • 自动化响应建议: 系统在发现负面言论后,除了预警,还能根据品牌知识库和预设策略,为公关团队生成初步的应对建议或草稿,进一步提升响应效率。
  • 多模态预警: 随着AI搜索引擎集成图片、视频等多模态内容,未来的预警系统需要能够分析图片中的品牌Logo、视频中的语音内容等,实现更全面的监控。
  • 与CRM/BI系统深度集成: 将预警数据无缝集成到客户关系管理(CRM)或商业智能(BI)系统中,为品牌决策提供更全面的数据支持,例如将负面评价与具体客户ID关联,或分析负面趋势对销售的影响。
  • 零样本/少样本学习: 减少对大量标注数据的依赖,通过更智能的Prompt工程和少量示例,让系统能够快速适应新的负面言论模式。

六、结语

今天,我们详细探讨了如何建立一个AI展现预警系统,以应对AI时代品牌声誉管理的新挑战。从数据采集、预处理、LLM深度分析,到智能预警和通知,我们构建了一个端到端的解决方案。这个系统不仅仅是技术上的创新,更是品牌在复杂多变的信息环境中保护自身、赢得用户信任的关键防线。

技术的道路永无止境,这个系统也需要持续的迭代和优化。我鼓励各位将今天所学的理论和代码付诸实践,结合您自身品牌的具体需求,不断探索和完善。只有这样,我们才能真正驾驭AI的力量,让它成为我们品牌增长和声誉维护的强大助力。

谢谢大家!期待与各位共同见证AI技术在品牌管理领域的更多创新。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐