玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


长尾数据挖掘闭环:利用 Embedding 聚类与 Label Studio 构建智能情绪标注体系

引言:情绪分类的"长尾困境"

每个做情绪分类的团队都遇到过这样的困境:你的模型在"正面"、“负面”、"中性"这三大类上表现完美,但当用户开始使用时,各种"奇怪"的情绪出现了——“焦虑”、“期待”、“困惑”、“讽刺”、“无奈”…这些长尾情绪在训练数据中几乎不存在,但在真实场景中却频繁出现。

更糟糕的是,你根本不知道这些长尾情绪有哪些。你无法预先定义它们,因为它们"隐藏"在大量未标注数据中。这就是典型的**开放世界半监督学习(Open-World SSL)**问题——模型不仅要识别已知类别,还要发现并学习未知的新类别。

本文将介绍如何构建一个完整的长尾数据挖掘闭环:利用 Embedding 离线聚类 自动发现未知情绪簇,结合 Label Studio 完成高质量预标注,最终实现情绪打标体系的持续扩充与迭代。


零、前置知识:理解核心技术

0.1 什么是开放世界半监督学习?

生活中的例子:

想象你是一家餐厅的经理,负责培训新服务员。你已经教会他们识别"满意"、“不满意”、"一般"这三种顾客反馈。但某天,一位顾客说:"这道菜让我想起了外婆的味道。"这是什么情绪?是满意?是怀念?还是感动?传统的分类系统无法处理这种"未知"情况。

未知类别(待发现)

怀念

感动

讽刺

...

已知类别

满意

不满意

一般

用户输入

开放世界半监督学习(Open-World SSL) 正是为解决这一问题而生。它的核心任务是:

  1. 识别已知类别:将样本正确分类到预定义的类别中
  2. 发现未知类别:自动识别不属于任何已知类别的新样本
  3. 聚类未知样本:将未知样本聚集成有意义的新类别

0.2 什么是 Embedding 聚类?

Embedding(嵌入) 是将文本转换为高维向量表示的技术。语义相似的文本,其 Embedding 向量在高维空间中距离更近。

聚类 是将相似的样本自动分组的技术。K-Means、DBSCAN、HDBSCAN 是常用的聚类算法。

Embedding 聚类 的核心思想是:

  1. 用预训练模型(如 BERT、Sentence-BERT)将文本转换为 Embedding
  2. 在 Embedding 空间中进行聚类
  3. 相同簇内的样本属于同一类别

聚类层

Embedding 层

输入层

文本1: 很开心

文本2: 非常快乐

文本3: 有点难过

文本4: 很伤心

向量1: [0.8, 0.9, ...]

向量2: [0.7, 0.85, ...]

向量3: [-0.6, -0.8, ...]

向量4: [-0.7, -0.75, ...]

簇1: 快乐类

簇2: 悲伤类

0.3 什么是 Label Studio?

Label Studio 是一个开源的数据标注平台,支持多种数据类型(文本、图像、音频、视频)的标注任务。它的核心优势包括:

  1. 灵活的标注界面:支持自定义标注配置
  2. ML 后端集成:可以连接机器学习模型进行预标注
  3. 团队协作:支持多人协作标注和质量控制
  4. 数据管理:完整的数据导入导出功能

官方资源:


一、整体架构设计

1.1 为什么需要闭环系统?

传统的情绪标注流程是线性的:

定义类别 → 收集数据 → 标注数据 → 训练模型 → 部署

这种线性流程的问题在于:一旦部署,类别就固定了。当新情绪出现时,你需要重新开始整个流程,效率极低。

闭环系统的核心思想是:让系统自动发现新类别,并持续迭代

闭环迭代

收集未标注数据

Embedding 提取

离线聚类

发现新簇?

Label Studio 预标注

人工审核

更新类别体系

重新训练模型

1.2 系统架构图

模型层

标注层

发现层

数据层

原始数据池

已标注数据

未标注数据

Embedding 模型

聚类引擎

新簇检测

Label Studio

ML Backend

人工审核

模型训练

模型部署


二、Embedding 离线聚类:发现未知情绪簇

2.1 为什么选择 Embedding 聚类?

传统的文本分类方法需要大量标注数据。但在长尾场景下,我们面临两个挑战:

  1. 未知类别:我们不知道有哪些新类别
  2. 样本稀疏:长尾类别样本很少,难以训练

Embedding 聚类的优势在于:

方法 需要标注数据 能发现新类别 适合长尾
监督学习 大量
半监督学习 少量 一般
Embedding 聚类
开放世界 SSL 少量

2.2 Embedding 模型选择

我们推荐使用 Sentence-BERT (SBERT),它专门针对句子级别的语义相似度进行了优化。

官方资源:

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Any, Tuple
from sklearn.cluster import HDBSCAN
from sklearn.metrics.pairwise import cosine_similarity


class EmbeddingClusterer:
    """
    Embedding 聚类器
    
    使用 Sentence-BERT 提取文本嵌入,然后使用 HDBSCAN 进行聚类。
    自动发现数据中的未知类别。
    
    设计原则:
    1. 无需预定义类别数量
    2. 自动识别噪声点(不属于任何类别的样本)
    3. 支持增量更新
    
    Attributes:
        model_name: Sentence-BERT 模型名称
        min_cluster_size: HDBSCAN 最小簇大小
        min_samples: HDBSCAN 最小样本数
    """
    
    def __init__(
        self,
        model_name: str = "paraphrase-multilingual-MiniLM-L12-v2",
        min_cluster_size: int = 10,
        min_samples: int = 5
    ):
        """
        初始化 Embedding 聚类器
        
        Args:
            model_name: SBERT 模型名称
                - paraphrase-multilingual-MiniLM-L12-v2: 多语言,速度快
                - paraphrase-mpnet-base-v2: 英文,效果最好
                - distiluse-base-multilingual-cased-v2: 多语言,平衡
            min_cluster_size: 最小簇大小,小于此值的簇会被视为噪声
            min_samples: 核心点邻域最小样本数
        """
        self.model = SentenceTransformer(model_name)
        self.min_cluster_size = min_cluster_size
        self.min_samples = min_samples
        self._embeddings_cache: Dict[str, np.ndarray] = {}
    
    def encode(self, texts: List[str]) -> np.ndarray:
        """
        将文本列表转换为 Embedding 矩阵
        
        这是核心方法,执行流程:
        1. 批量处理文本,提高效率
        2. 缓存已处理的文本,避免重复计算
        3. 返回归一化的 Embedding
        
        Args:
            texts: 文本列表
            
        Returns:
            Embedding 矩阵,shape=(n_texts, embedding_dim)
        
        使用示例:
            clusterer = EmbeddingClusterer()
            embeddings = clusterer.encode(["今天很开心", "心情不好"])
            print(embeddings.shape)  # (2, 384)
        """
        # 分离已缓存和未缓存的文本
        uncached_texts = []
        uncached_indices = []
        embeddings = np.zeros((len(texts), self.model.get_sentence_embedding_dimension()))
        
        for i, text in enumerate(texts):
            if text in self._embeddings_cache:
                embeddings[i] = self._embeddings_cache[text]
            else:
                uncached_texts.append(text)
                uncached_indices.append(i)
        
        # 批量处理未缓存的文本
        if uncached_texts:
            new_embeddings = self.model.encode(
                uncached_texts,
                show_progress_bar=False,
                normalize_embeddings=True
            )
            for idx, text, emb in zip(uncached_indices, uncached_texts, new_embeddings):
                embeddings[idx] = emb
                self._embeddings_cache[text] = emb
        
        return embeddings
    
    def cluster(
        self,
        texts: List[str],
        known_labels: List[str] = None
    ) -> Tuple[np.ndarray, Dict[int, List[int]]]:
        """
        对文本进行聚类,发现未知类别
        
        核心逻辑:
        1. 提取 Embedding
        2. 使用 HDBSCAN 聚类(无需预定义类别数)
        3. 分析聚类结果,识别新簇
        
        Args:
            texts: 待聚类的文本列表
            known_labels: 已知类别列表(用于区分新旧类别)
            
        Returns:
            labels: 每个样本的聚类标签,-1 表示噪声
            clusters: 簇ID到样本索引列表的映射
        
        使用示例:
            texts = ["今天很开心", "心情不好", "有点焦虑", "非常快乐"]
            labels, clusters = clusterer.cluster(texts)
            print(f"发现 {len(clusters)} 个簇")
        """
        # 步骤 1:提取 Embedding
        embeddings = self.encode(texts)
        
        # 步骤 2:HDBSCAN 聚类
        # HDBSCAN 相比 K-Means 的优势:
        # - 不需要预定义类别数量
        # - 自动识别噪声点
        # - 能处理不同密度的簇
        clusterer = HDBSCAN(
            min_cluster_size=self.min_cluster_size,
            min_samples=self.min_samples,
            metric='euclidean'
        )
        labels = clusterer.fit_predict(embeddings)
        
        # 步骤 3:整理聚类结果
        clusters: Dict[int, List[int]] = {}
        for idx, label in enumerate(labels):
            if label not in clusters:
                clusters[label] = []
            clusters[label].append(idx)
        
        # 步骤 4:过滤噪声簇(label=-1)
        # 噪声点不属于任何明确的类别,可能是:
        # - 真正的噪声数据
        # - 尚未形成规模的新类别
        
        return labels, clusters
    
    def get_cluster_representatives(
        self,
        texts: List[str],
        labels: np.ndarray,
        clusters: Dict[int, List[int]],
        top_k: int = 5
    ) -> Dict[int, List[str]]:
        """
        获取每个簇的代表性样本
        
        通过计算簇内每个样本到簇中心的距离,
        选择距离中心最近的样本作为代表。
        这些代表样本将用于 Label Studio 预标注。
        
        Args:
            texts: 原始文本列表
            labels: 聚类标签
            clusters: 簇映射
            top_k: 每个簇返回的代表数量
            
        Returns:
            簇ID到代表性文本列表的映射
        """
        embeddings = self.encode(texts)
        representatives: Dict[int, List[str]] = {}
        
        for cluster_id, indices in clusters.items():
            if cluster_id == -1:  # 跳过噪声簇
                    continue
            
            # 获取簇内所有样本的 Embedding
            cluster_embeddings = embeddings[indices]
            
            # 计算簇中心
            centroid = np.mean(cluster_embeddings, axis=0)
            
            # 计算每个样本到中心的距离
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            
            # 选择距离中心最近的 top_k 个样本
            closest_indices = np.argsort(distances)[:top_k]
            representatives[cluster_id] = [texts[indices[i]] for i in closest_indices]
        
        return representatives

2.3 新簇检测策略

如何判断一个簇是"新类别"还是"已知类别"?我们使用以下策略:

from typing import List, Dict, Set
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity


class NoveltyDetector:
    """
    新颖性检测器
    
    判断聚类结果中的簇是否属于新类别。
    通过计算簇中心与已知类别中心的相似度来判断。
    
    Attributes:
        similarity_threshold: 相似度阈值,低于此值视为新类别
    """
    
    def __init__(self, similarity_threshold: float = 0.7):
        """
        初始化新颖性检测器
        
        Args:
            similarity_threshold: 相似度阈值
                - 0.7: 严格,只有明显不同的簇才被视为新类别
                - 0.5: 宽松,更多簇会被视为新类别
        """
        self.similarity_threshold = similarity_threshold
        self.known_category_centers: Dict[str, np.ndarray] = {}
    
    def register_known_category(
        self,
        category_name: str,
        sample_texts: List[str],
        embedder: EmbeddingClusterer
    ):
        """
        注册已知类别
        
        通过样本文本计算类别中心向量。
        
        Args:
            category_name: 类别名称
            sample_texts: 该类别的样本文本
            embedder: Embedding 提取器
        """
        embeddings = embedder.encode(sample_texts)
        self.known_category_centers[category_name] = np.mean(embeddings, axis=0)
    
    def detect_novel_clusters(
        self,
        cluster_representatives: Dict[int, List[str]],
        embedder: EmbeddingClusterer
    ) -> Dict[int, Dict[str, Any]]:
        """
        检测新簇
        
        对每个簇:
        1. 计算簇中心
        2. 与所有已知类别中心比较
        3. 如果最大相似度低于阈值,则为新类别
        
        Args:
            cluster_representatives: 簇ID到代表性文本的映射
            embedder: Embedding 提取器
            
        Returns:
            检测结果,包含是否为新类别、最相似已知类别等信息
        """
        results = {}
        
        for cluster_id, texts in cluster_representatives.items():
            # 计算簇中心
            embeddings = embedder.encode(texts)
            cluster_center = np.mean(embeddings, axis=0)
            
            # 与已知类别比较
            max_similarity = 0
            most_similar_category = None
            
            for cat_name, cat_center in self.known_category_centers.items():
                similarity = cosine_similarity(
                    cluster_center.reshape(1, -1),
                    cat_center.reshape(1, -1)
                )[0][0]
                
                if similarity > max_similarity:
                    max_similarity = similarity
                    most_similar_category = cat_name
            
            # 判断是否为新类别
            is_novel = max_similarity < self.similarity_threshold
            
            results[cluster_id] = {
                "is_novel": is_novel,
                "max_similarity": max_similarity,
                "most_similar_category": most_similar_category,
                "representative_texts": texts[:3]  # 只保留前3个作为代表
            }
        
        return results

三、Label Studio 集成:预标注与人工审核

3.1 Label Studio 安装与配置

官方安装文档: https://labelstud.io/guide/install

# 使用 pip 安装
pip install label-studio

# 或使用 Docker
docker run -it -p 8080:8080 -v $(pwd)/mydata:/label-studio/data heartexlabs/label-studio:latest

# 启动 Label Studio
label-studio start my_project --init

3.2 情绪分析标注界面配置

官方模板文档: https://labelstud.io/templates/sentiment_analysis

<View>
  <Header value="请选择文本的情绪类别:"/>
  <Text name="text" value="$text"/>
  
  <!-- 已知类别 -->
  <View style="display: flex; gap: 10px; margin-top: 10px;">
    <Header value="已知类别:" style="font-weight: bold;"/>
  </View>
  <Choices name="known_sentiment" toName="text" choice="single" showInline="true">
    <Choice value="正面"/>
    <Choice value="负面"/>
    <Choice value="中性"/>
  </Choices>
  
  <!-- 新发现的类别(动态生成) -->
  <View style="display: flex; gap: 10px; margin-top: 20px;">
    <Header value="新发现的类别(待确认):" style="font-weight: bold; color: #ff9800;"/>
  </View>
  <Choices name="novel_sentiment" toName="text" choice="single" showInline="true">
    <Choice value="焦虑"/>
    <Choice value="期待"/>
    <Choice value="困惑"/>
    <Choice value="讽刺"/>
    <Choice value="其他(请在备注中说明)"/>
  </Choices>
  
  <!-- 备注区域 -->
  <TextArea name="comment" toName="text" placeholder="如有其他情绪或备注,请在此填写"/>
</View>

3.3 ML Backend 集成:实现预标注

官方 ML Backend 文档: https://labelstud.io/guide/ml_create

Label Studio 的 ML Backend 允许我们将聚类模型集成到标注流程中,实现自动预标注。

from label_studio_ml.model import LabelStudioMLBase
from typing import List, Dict, Any
import numpy as np


class SentimentClusteringBackend(LabelStudioMLBase):
    """
    情绪聚类 ML Backend
    
    将 Embedding 聚类模型集成到 Label Studio。
    支持两种模式:
    1. 预标注模式:对未标注数据自动生成预测标签
    2. 交互模式:根据用户标注实时更新聚类结果
    
    官方文档:https://labelstud.io/guide/ml_create
    
    Attributes:
        clusterer: Embedding 聚类器
        novelty_detector: 新颖性检测器
    """
    
    def __init__(self, **kwargs):
        """
        初始化 ML Backend
        
        调用父类构造函数,并初始化聚类器。
        """
        super().__init__(**kwargs)
        
        # 从 Labeling Interface 解析配置
        self.from_name, self.info = list(self.parsed_label_config.items())[0]
        self.to_name = self.info['to_name']
        self.labels = self.info['labels']
        
        # 初始化聚类器
        self.clusterer = EmbeddingClusterer()
        self.novelty_detector = NoveltyDetector()
        
        # 注册已知类别
        self._register_known_categories()
    
    def _register_known_categories(self):
        """
        注册已知类别
        
        从已标注数据中提取每个类别的样本,
        计算类别中心向量。
        """
        # 这里应该从数据库或文件加载已知类别的样本
        # 简化示例:使用硬编码
        known_samples = {
            "正面": ["很开心", "非常满意", "太棒了"],
            "负面": ["很失望", "太糟糕了", "不满意"],
            "中性": ["一般", "还行", "普通"]
        }
        
        for category, samples in known_samples.items():
            self.novelty_detector.register_known_category(
                category, samples, self.clusterer
            )
    
    def predict(self, tasks: List[Dict], context: Dict = None, **kwargs) -> List[Dict]:
        """
        生成预测结果
        
        这是 ML Backend 的核心方法。
        Label Studio 会将待标注任务发送到此方法,
        返回的预测结果会显示在标注界面中。
        
        官方文档:https://labelstud.io/guide/ml_create#Implement-prediction-logic
        
        Args:
            tasks: Label Studio 任务列表
                - 每个任务包含 data 字段,其中有文本内容
            context: 交互模式下的上下文信息
                - 包含用户已标注的信息
            
        Returns:
            预测结果列表,格式见:
            https://labelstud.io/guide/predictions#Format-pre-annotations-for-Label-Studio
        """
        predictions = []
        
        for task in tasks:
            # 提取文本
            text = task['data'].get('text', '')
            if not text:
                predictions.append({})
                continue
            
            # 获取 Embedding
            embedding = self.clusterer.encode([text])[0]
            
            # 与已知类别比较
            max_similarity = 0
            predicted_label = None
            
            for cat_name, cat_center in self.novelty_detector.known_category_centers.items():
                similarity = cosine_similarity(
                    embedding.reshape(1, -1),
                    cat_center.reshape(1, -1)
                )[0][0]
                
                if similarity > max_similarity:
                    max_similarity = similarity
                    predicted_label = cat_name
            
            # 如果相似度低于阈值,标记为"待确认"
            if max_similarity < self.novelty_detector.similarity_threshold:
                predicted_label = "待确认-可能为新类别"
                score = max_similarity  # 低置信度
            else:
                score = max_similarity
            
            # 构建预测结果
            # 格式参考:https://labelstud.io/guide/predictions
            prediction = {
                "result": [
                    {
                        "from_name": self.from_name,
                        "to_name": self.to_name,
                        "type": "choices",
                        "value": {
                            "choices": [predicted_label]
                        }
                    }
                ],
                "score": float(score),
                "model_version": "1.0.0"
            }
            predictions.append(prediction)
        
        return predictions
    
    def fit(self, event: str, data: Dict, **kwargs):
        """
        训练/更新模型
        
        当标注完成时,Label Studio 会调用此方法。
        可以在这里更新聚类模型或注册新类别。
        
        官方文档:https://labelstud.io/guide/ml_create#Implement-training-logic-optional
        
        Args:
            event: 事件类型
                - 'ANNOTATION_CREATED': 新标注创建
                - 'ANNOTATION_UPDATED': 标注更新
            data: 事件数据,包含标注信息
        """
        if event not in ['ANNOTATION_CREATED', 'ANNOTATION_UPDATED']:
            return
        
        # 提取标注结果
        annotation = data.get('annotation', {})
        result = annotation.get('result', [])
        
        for item in result:
            if item.get('type') == 'choices':
                choices = item.get('value', {}).get('choices', [])
                if choices:
                    label = choices[0]
                    text = data.get('task', {}).get('data', {}).get('text', '')
                    
                    # 如果是新类别,注册到新颖性检测器
                    if label not in self.novelty_detector.known_category_centers:
                        self.novelty_detector.register_known_category(
                            label, [text], self.clusterer
                        )
                        print(f"注册新类别: {label}")

3.4 部署 ML Backend

官方文档: https://labelstud.io/guide/ml_create#Run-the-ML-backend-server

# 创建 ML Backend 目录
label-studio-ml create sentiment_clustering_backend

# 将上面的代码保存到 sentiment_clustering_backend/model.py

# 安装依赖
pip install -r sentiment_clustering_backend/requirements.txt
pip install sentence-transformers scikit-learn hdbscan

# 启动 ML Backend
label-studio-ml start sentiment_clustering_backend -p 9090

# 或使用 Docker
cd sentiment_clustering_backend
docker-compose up

3.5 连接 ML Backend 到 Label Studio

官方文档: https://labelstud.io/guide/ml#Connect-the-model-to-Label-Studio

  1. 打开 Label Studio,进入项目设置
  2. 点击 Machine Learning 选项卡
  3. 点击 Add Model
  4. 填写 ML Backend URL(如 http://localhost:9090
  5. 启用 Use for interactive preannotations
  6. 点击 Validate and Save

四、完整工作流程

4.1 数据导入与预处理

import pandas as pd
from typing import List, Dict, Any
import json


def prepare_data_for_label_studio(
    texts: List[str],
    output_file: str = "tasks.json"
) -> List[Dict]:
    """
    准备 Label Studio 导入数据
    
    Label Studio 支持多种导入格式。
    这里使用 JSON 格式,每个任务包含一个文本。
    
    官方文档:https://labelstud.io/guide/tasks#Import-data
    
    Args:
        texts: 文本列表
        output_file: 输出文件路径
        
    Returns:
        Label Studio 任务列表
    """
    tasks = []
    for i, text in enumerate(texts):
        task = {
            "id": i,
            "data": {
                "text": text
            }
        }
        tasks.append(task)
    
    # 保存为 JSON 文件
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(tasks, f, ensure_ascii=False, indent=2)
    
    return tasks


def import_preannotated_data(
    texts: List[str],
    predictions: List[str],
    output_file: str = "preannotated.json"
) -> List[Dict]:
    """
    导入预标注数据
    
    将聚类结果转换为 Label Studio 预标注格式。
    这样标注员只需要审核和修正,而不需要从零开始标注。
    
    官方文档:https://labelstud.io/guide/predictions#Import-pre-annotated-data-into-Label-Studio
    
    Args:
        texts: 文本列表
        predictions: 预测标签列表
        output_file: 输出文件路径
        
    Returns:
        包含预标注的任务列表
    """
    tasks = []
    for i, (text, pred) in enumerate(zip(texts, predictions)):
        task = {
            "id": i,
            "data": {
                "text": text
            },
            "predictions": [
                {
                    "result": [
                        {
                            "from_name": "sentiment",
                            "to_name": "text",
                            "type": "choices",
                            "value": {
                                "choices": [pred]
                            }
                        }
                    ]
                }
            ]
        }
        tasks.append(task)
    
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(tasks, f, ensure_ascii=False, indent=2)
    
    return tasks

4.2 批量聚类与预标注生成

import asyncio
from typing import List, Dict, Tuple
import pandas as pd


class LongTailMiningPipeline:
    """
    长尾数据挖掘流水线
    
    完整的闭环流程:
    1. 加载未标注数据
    2. Embedding 聚类
    3. 新簇检测
    4. 生成 Label Studio 预标注
    5. 导出结果
    
    Attributes:
        clusterer: Embedding 聚类器
        novelty_detector: 新颖性检测器
        batch_size: 批处理大小
    """
    
    def __init__(
        self,
        clusterer: EmbeddingClusterer,
        novelty_detector: NoveltyDetector,
        batch_size: int = 1000
    ):
        """
        初始化流水线
        
        Args:
            clusterer: Embedding 聚类器实例
            novelty_detector: 新颖性检测器实例
            batch_size: 批处理大小,用于控制内存使用
        """
        self.clusterer = clusterer
        self.novelty_detector = novelty_detector
        self.batch_size = batch_size
    
    def run(
        self,
        texts: List[str],
        output_dir: str = "output"
    ) -> Dict[str, Any]:
        """
        执行完整的挖掘流程
        
        这是主入口方法,执行流程:
        1. 分批处理文本(避免内存溢出)
        2. 对每批数据进行聚类
        3. 检测新簇
        4. 生成预标注文件
        
        Args:
            texts: 待处理的文本列表
            output_dir: 输出目录
            
        Returns:
            处理结果摘要
        """
        import os
        os.makedirs(output_dir, exist_ok=True)
        
        all_labels = []
        all_novel_clusters = {}
        
        # 分批处理
        for i in range(0, len(texts), self.batch_size):
            batch_texts = texts[i:i + self.batch_size]
            
            # 聚类
            labels, clusters = self.clusterer.cluster(batch_texts)
            all_labels.extend(labels)
            
            # 获取代表性样本
            representatives = self.clusterer.get_cluster_representatives(
                batch_texts, labels, clusters
            )
            
            # 检测新簇
            novel_results = self.novelty_detector.detect_novel_clusters(
                representatives, self.clusterer
            )
            
            # 整理新簇信息
            for cluster_id, result in novel_results.items():
                if result['is_novel']:
                    all_novel_clusters[f"batch_{i}_cluster_{cluster_id}"] = {
                        "representative_texts": result['representative_texts'],
                        "similarity_to_known": result['max_similarity'],
                        "most_similar_category": result['most_similar_category']
                    }
        
        # 生成预标注文件
        predictions = []
        for label in all_labels:
            if label == -1:
                predictions.append("待确认-噪声或新类别")
            else:
                predictions.append(f"簇_{label}")
        
        preannotated = import_preannotated_data(
            texts, predictions,
            f"{output_dir}/preannotated.json"
        )
        
        # 生成新簇报告
        report = {
            "total_texts": len(texts),
            "total_clusters": len(set(all_labels)) - (1 if -1 in all_labels else 0),
            "noise_count": all_labels.count(-1),
            "novel_clusters": all_novel_clusters,
            "novel_cluster_count": len(all_novel_clusters)
        }
        
        with open(f"{output_dir}/report.json", 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        return report


# 使用示例
if __name__ == "__main__":
    # 加载数据
    df = pd.read_csv("unlabeled_data.csv")
    texts = df['text'].tolist()
    
    # 初始化组件
    clusterer = EmbeddingClusterer()
    novelty_detector = NoveltyDetector(similarity_threshold=0.7)
    
    # 注册已知类别
    known_samples = {
        "正面": ["很开心", "非常满意", "太棒了"],
        "负面": ["很失望", "太糟糕了", "不满意"],
        "中性": ["一般", "还行", "普通"]
    }
    for cat, samples in known_samples.items():
        novelty_detector.register_known_category(cat, samples, clusterer)
    
    # 执行流水线
    pipeline = LongTailMiningPipeline(clusterer, novelty_detector)
    report = pipeline.run(texts)
    
    print(f"发现 {report['novel_cluster_count']} 个新簇")
    print(f"噪声样本: {report['noise_count']}")

4.3 导出标注结果

官方文档: https://labelstud.io/guide/export

def export_annotations(
    project_id: int,
    output_file: str = "annotations.json"
):
    """
    导出 Label Studio 标注结果
    
    可以通过 UI 或 API 导出。
    导出的数据可用于重新训练模型。
    
    官方文档:https://labelstud.io/guide/export
    
    Args:
        project_id: Label Studio 项目 ID
        output_file: 输出文件路径
    """
    import requests
    
    # Label Studio API 配置
    LABEL_STUDIO_URL = "http://localhost:8080"
    API_KEY = "your-api-key"
    
    # 调用导出 API
    response = requests.get(
        f"{LABEL_STUDIO_URL}/api/projects/{project_id}/export",
        headers={"Authorization": f"Token {API_KEY}"}
    )
    
    annotations = response.json()
    
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(annotations, f, ensure_ascii=False, indent=2)
    
    return annotations


def parse_annotations_for_training(
    annotations: List[Dict]
) -> Tuple[List[str], List[str]]:
    """
    解析标注结果用于模型训练
    
    将 Label Studio 导出的 JSON 格式转换为训练数据格式。
    
    Args:
        annotations: Label Studio 导出的标注列表
        
    Returns:
        texts: 文本列表
        labels: 标签列表
    """
    texts = []
    labels = []
    
    for annotation in annotations:
        text = annotation.get('data', {}).get('text', '')
        result = annotation.get('annotations', [{}])[0].get('result', [])
        
        for item in result:
            if item.get('type') == 'choices':
                choices = item.get('value', {}).get('choices', [])
                if choices:
                    texts.append(text)
                    labels.append(choices[0])
                    break
    
    return texts, labels

五、持续迭代与优化

5.1 闭环迭代流程

质量监控

标注一致性

模型准确率

新类别发现率

迭代循环

收集新数据

聚类分析

发现新类别?

Label Studio 预标注

人工审核确认

更新类别体系

重新训练模型

部署新模型

5.2 质量控制指标

指标 计算方法 目标值
标注一致性 多人标注相同任务的同意率 > 85%
模型准确率 测试集准确率 > 90%
新类别发现率 新发现类别数 / 总类别数 监控趋势
噪声比例 聚类噪声点数 / 总样本数 < 10%

5.3 自动化触发条件

class IterationTrigger:
    """
    迭代触发器
    
    定义何时触发新一轮聚类分析和模型更新。
    """
    
    def __init__(
        self,
        min_new_samples: int = 1000,
        min_time_interval: int = 7 * 24 * 3600,  # 7天
        accuracy_threshold: float = 0.85
    ):
        """
        初始化触发器
        
        Args:
            min_new_samples: 最小新样本数阈值
            min_time_interval: 最小时间间隔(秒)
            accuracy_threshold: 准确率阈值
        """
        self.min_new_samples = min_new_samples
        self.min_time_interval = min_time_interval
        self.accuracy_threshold = accuracy_threshold
        self.last_iteration_time = 0
        self.new_sample_count = 0
        self.current_accuracy = 1.0
    
    def should_trigger(self) -> bool:
        """
        判断是否应该触发新一轮迭代
        
        触发条件(满足任一):
        1. 新样本数超过阈值
        2. 时间间隔超过阈值
        3. 模型准确率低于阈值
        
        Returns:
            是否触发迭代
        """
        import time
        
        # 条件 1:新样本数
        if self.new_sample_count >= self.min_new_samples:
            return True
        
        # 条件 2:时间间隔
        if time.time() - self.last_iteration_time >= self.min_time_interval:
            return True
        
        # 条件 3:准确率下降
        if self.current_accuracy < self.accuracy_threshold:
            return True
        
        return False
    
    def record_new_sample(self, count: int = 1):
        """记录新样本"""
        self.new_sample_count += count
    
    def record_iteration(self, accuracy: float):
        """记录迭代完成"""
        import time
        self.last_iteration_time = time.time()
        self.new_sample_count = 0
        self.current_accuracy = accuracy

六、总结

本文介绍了如何构建一个完整的长尾数据挖掘闭环系统,核心要点如下:

核心要点回顾

  1. 开放世界半监督学习是解决长尾问题的关键

    传统的封闭世界假设无法处理未知类别。开放世界 SSL 允许模型同时识别已知类别和发现未知类别,这正是长尾场景需要的。

  2. Embedding 聚类是发现未知类别的有效方法

    通过 Sentence-BERT 提取语义嵌入,使用 HDBSCAN 进行聚类,可以自动发现数据中的潜在类别,无需预定义类别数量。

  3. Label Studio ML Backend 实现了标注与模型的闭环

    ML Backend 将聚类模型集成到标注流程中,实现预标注、交互式标注和模型更新的完整闭环。

  4. 持续迭代是保持系统活力的关键

    通过定义触发条件,系统可以自动启动新一轮聚类分析,持续发现新类别,保持情绪打标体系的与时俱进。

核心 API 快速参考

组件 核心 API 用途
EmbeddingClusterer encode(), cluster() 文本嵌入与聚类
NoveltyDetector detect_novel_clusters() 新类别检测
SentimentClusteringBackend predict(), fit() Label Studio ML Backend
LongTailMiningPipeline run() 完整挖掘流程

官方资源链接

资源 链接
Label Studio 官网 https://labelstud.io
Label Studio GitHub https://github.com/HumanSignal/label-studio
ML Backend GitHub https://github.com/HumanSignal/label-studio-ml-backend
ML Backend 开发文档 https://labelstud.io/guide/ml_create
情绪分析模板 https://labelstud.io/templates/sentiment_analysis
预标注格式文档 https://labelstud.io/guide/predictions
Sentence-BERT 文档 https://www.sbert.net

未来展望

这个系统还有很大的扩展空间:

  1. 主动学习集成:优先标注模型不确定的样本,提高标注效率
  2. 多模态支持:扩展到图像、音频等多模态数据的聚类
  3. 实时流处理:支持实时数据流的在线聚类
  4. 自动化程度提升:结合 LLM 自动生成新类别名称

本文档基于 Label Studio 官方文档和开放世界半监督学习理论编写,所有代码均可直接运行或作为参考。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐