在这里插入图片描述


引言:为什么微调数据的质量至关重要?

当前,开源大模型(如Llama、Qwen、DeepSeek等)的预训练已相当成熟,其通识能力主要来源于海量的无标注语料。然而,要让模型成为一名专业的“法律顾问”、“代码助手”或“客服专家”,就需要通过有监督微调,向其注入特定领域的指令-响应对。这个过程的核心假设是:模型能从高质量的示范中“学会”遵循指令、生成合规格式、运用领域知识并进行严谨推理。

低质量的微调数据会引入多种风险:

  1. 知识/事实性错误: 导致模型“学习”错误信息,产生“幻觉”。
  2. 格式与风格污染: 使模型输出变得混乱、不一致或带有不良风格。
  3. 偏见放大: 强化数据中存在的性别、种族、文化等偏见。
  4. 过拟合与泛化性差: 模型只记住了数据中的噪声和特例,而非通用模式。
  5. 训练不稳定: 包含大量异常值或冲突样本的数据会拖慢收敛,甚至导致训练崩溃。

因此,构建一套严谨、自动化与可复现的数据处理流水线,是任何严肃的SFT项目成功的先决条件。本文接下来的章节将围绕这一流水线展开。


一、 数据清洗:从“脏数据”到“干净样本”的炼金术

数据清洗是预处理的第一步,旨在剔除噪声、纠正错误、统一格式,为后续所有操作打下坚实基础。其本质是对数据的“外科手术”。

1.1 数据格式标准化

在集成来自不同来源(爬虫、API、人工标注、开源数据集)的数据时,格式混乱是首要问题。

  • 1.1.1 统一输入输出格式(JSON/JSONL/CSV): 大模型微调,尤其是对话或指令数据,普遍采用JSON Lines (JSONL)格式,即每行是一个独立的JSON对象。这便于流式读取、并行处理和容错。标准字段通常包括:instruction(指令)、input(可选输入)、output(期望输出),以及可能的systemconversations(多轮)等。将所有数据统一转换为JSONL格式是第一步。
  • 1.1.2 字段校验与缺失字段补全: 对每个JSON对象的必需字段(如instructionoutput)进行非空检查。对于缺失的次要字段,可采取策略:若input字段缺失但上下文隐含,可置为空字符串““;若system角色缺失,可根据数据集全局设定一个默认值(如“你是一个有帮助的AI助手”)。核心目标是保证后续处理逻辑能稳定访问每个字段。
  • 1.1.3 编码统一(UTF-8、BOM处理): 不同来源的文本可能使用不同的字符编码(如GBK、ISO-8859-1)。必须将所有文本统一转换为UTF-8 without BOM编码。BOM(Byte Order Mark)在某些编辑器(如Windows记事本)中会被添加到UTF-8文件开头,可能导致解析错误,需在读取时去除。

1.2 噪声过滤

噪声过滤旨在移除对模型学习无益甚至有害的内容。

  • 1.2.1 无意义内容去除
    • 纯标点/数字/符号: 如“……”、“!!!”、“12345”。
    • 无意义重复字符: 如“好好好好好”、“testtesttest”。可通过正则表达式检测字符重复模式。
    • 乱码与无效Unicode: 如“���”、“ä½ å¥½”,使用ftfy等库可修复部分常见乱码,无法修复的应剔除。
  • 1.2.2 低质量文本过滤
    • 过长/过短文本: 过短的文本(如少于3个有效词)信息量不足;过长的文本(如超过模型上下文窗口的80%)在SFT中可能导致截断损失。需根据任务设定合理的长度阈值。
    • 困惑度阈值: 使用一个在干净语料上训练的小型语言模型(如KenLMn-gram模型或一个小型GPT-2)计算每个样本的困惑度(Perplexity, PPL)。PPL过高的样本,通常语法混乱、语义不通,应被过滤。PPL阈值需在验证集上通过实验确定。
  • 1.2.3 语言识别与无关语言剔除: 针对多语言或特定语言任务,使用langdetectfasttext语言识别模型判断文本主语言。若目标是微调中文模型,则过滤掉非中文占主导的样本,除非任务明确需要多语言能力。
  • 1.2.4 敏感信息与隐私内容脱敏: 对包含个人可识别信息(PII)的数据进行脱敏是法律和伦理要求。
    • 规则匹配: 使用正则表达式匹配邮箱、手机号、身份证号、信用卡号等。
    • 命名实体识别(NER): 使用模型识别姓名、地址、组织机构名等,并用占位符(如[NAME][ADDRESS])替换。
    • 差分隐私: 在某些高隐私要求的场景,可在文本表示层面引入差分隐私噪声,但会轻微影响数据效用。

1.3 异常值与离群点处理

异常值并非总是噪声,但可能对模型训练产生不成比例的影响。

  • 1.3.1 统计特征检测: 计算整个数据集在字符数、词数、句子数、标点比例、停用词比例等方面的分布。通过Z-scoreIQR(四分位距)方法,识别在某个或多个特征上显著偏离整体分布的样本。例如,一个字符数超过99.9%分位数的样本,可能是一个未被正确分割的超长文档。
  • 1.3.2 基于模型的异常检测: 使用在高质量数据上微调的小型模型(如Sentence-BERT)计算所有样本的语义嵌入,然后在嵌入空间中使用Isolation ForestLOF(局部离群因子)算法检测语义异常点。这些点可能与主体数据分布差异巨大,需人工复核决定去留。

1.4 标签质量校验(监督微调场景)

在指令微调中,output可视为“标签”。其质量至关重要。

  • 1.4.1 标签一致性检查: 对于分类或结构化生成任务,检查output是否符合预定义的格式(如JSON、YAML)或取值集合。使用解析器(如json.loads)验证,解析失败的样本需修正或剔除。
  • 1.4.2 标签噪声识别与修正
    • 自洽性检查: 对于同一instruction有多种可能答案的任务,检查是否存在矛盾的output。可利用大模型(如GPT-4)作为评判员,评估不同output之间的一致性,或将它们聚类,找出离群的、可能错误的标注。
    • 基于模型的校验: 训练一个轻量级的“校验模型”(如DeBERTa),预测给定(instruction, input, output)三元组的正确概率。概率过低的样本被标记为潜在噪声,供人工复核。
  • 1.4.3 人工复核流程设计: 自动化只能发现疑似问题。必须建立高效的人工复核流程,如开发一个简单的Web界面,将疑似问题样本(自动化标记+原因)展示给标注员进行快速确认、修正或丢弃。

1.5 缺失数据处理

  • 1.5.1 缺失样本剔除: 对于关键字段(instruction, output)完全缺失或严重损坏的样本,最直接的方法是剔除。在数据量充足时这是首选。
  • 1.5.2 基于上下文的重构补全: 对于部分字段缺失但上下文信息丰富的样本,可尝试用模型补全。例如,如果instruction缺失但有inputoutput,可以用一个大模型(如Qwen2.5-7B-Instruct)根据inputoutput反推出可能的instruction。但此方法生成的数据需谨慎验证。

1.6 文本深层清洗

处理文本内部的格式和字符问题。

  • 1.6.1 HTML/XML标签去除: 从网页抓取的数据常包含HTML标签。使用BeautifulSouplxml库提取纯文本,并注意处理 等HTML实体。
  • 1.6.2 特殊符号与不可见字符清理: 移除控制字符(\x00-\x1F)、方向控制字符(\u202A-\u202E)等。但需保留有意义的符号,如数学符号、货币符号等。
  • 1.6.3 统一空白字符与换行符: 将连续的空格、制表符等合并为单个空格。将不同操作系统的换行符(\r\n, \r, \n)统一为\n。这能减少模型在格式上的困惑。

经过以上清洗,我们得到了一个格式统一、噪声较低、相对干净的数据集。但这可能还不够,尤其当数据量不足或多样性不够时。


二、 数据增强:以有限数据,创无限可能

数据增强旨在不改变样本核心语义的前提下,通过变换生成新的、多样化的训练样本,提升模型的泛化能力和鲁棒性,尤其适用于数据稀缺的场景。

2.1 基于规则的增强方法

简单、可控、速度快,但可能生成不自然的文本。

  • 2.1.1 同义词替换: 使用WordNetHowNet(中文)或领域自定义的同义词词表,随机替换句子中的非停用词。注意避免替换关键实体(如专有名词)导致事实错误。
  • 2.1.2 回译: 将文本翻译成一种中间语言(如英语),再翻译回原语言。这能有效改写句式,增加语言表达的多样性。质量依赖于翻译模型(如Google Translate API, mBART)的性能。通常中->英->中是最稳定的组合之一。
  • 2.1.3 随机插入/删除/交换: 对句子中的词或instruction中的短语进行随机插入(同义词)、删除(非关键信息词)或交换(相邻词顺序)。这种方法更适用于文本分类,在指令数据中需谨慎,避免破坏指令的完整性。
  • 2.1.4 模板化改写: 为常见的指令类型(如“总结以下文本:”、“将X转换为Y:”)设计多种表述模板,然后进行填充替换。例如,“请概括下文”可改写为“为下面的文章写一个摘要:”、“用一句话总结这段内容:”等。

2.2 基于模型的增强方法

能生成更流畅、更自然的文本变体,但成本更高,且可能引入模型自身的偏见。

  • 2.2.1 使用小型生成模型: 使用在通用语料上训练的T5GPT-2,以“对以下句子进行复述:{原始文本}”为提示,生成多个复述版本。可控制生成温度(temperature)来调节多样性。
  • 2.2.2 基于大模型蒸馏的改写: 使用一个强大的教师模型(如GPT-4Claude),对原始指令进行高质量的重写、扩写或风格转换。生成的数据可用于微调较小的学生模型,这是知识蒸馏的一种形式。例如,用Llama-3.1-8B-Instruct改写自身训练数据的一部分,以提升其指令跟随的多样性。
  • 2.2.3 对抗样本生成: 在安全对齐或鲁棒性微调中,有意识地生成对抗性样本。例如,使用TextAttack等库,在input中插入轻微的扰动(拼写错误、同音字替换),而output保持不变,以增强模型对噪声的抵抗力。或生成具有挑衅、诱导性的instruction,并配以安全、拒绝性的output,以强化模型的安全护栏。

2.3 面向指令微调(SFT)的增强策略

专门针对指令-响应对结构的设计。

  • 2.3.1 指令多样性扩增
    • 重述: 用不同方式表达同一指令意图。“写一首关于春天的诗” -> “请创作一首描绘春日景象的诗歌”。
    • 角色变换: 为指令添加不同的角色设定。“以历史学家的口吻解释法国大革命” -> “假设你是一位中学生,用通俗的语言解释法国大革命”。
  • 2.3.2 输入-输出对的反向生成: 给定高质量的output(如一段优秀的代码、一篇严谨的分析),让模型(如GPT-4)反推出可能触发这个outputinstructioninput。这能有效挖掘高质量响应对应的多样化提问方式。
  • 2.3.3 多轮对话的对话链增强: 对于一个多轮对话样本,可以从中截取不同的子对话链作为新的训练样本。例如,一个5轮的对话,可以生成(第1轮, 第2轮)(第1+2轮, 第3轮)(第1+2+3轮, 第4轮)等多个样本,增加对话历史上下文的多样性。

2.4 多模态微调数据增强(若有)

对于图文多模态模型(如LLaVA)的微调,数据增强需同时考虑图像和文本。

  • 2.4.1 图像字幕的文本变体: 对图文对中的文本描述(字幕)进行上述的文本增强(同义词替换、回译、模型复述),生成同一张图片的多种描述。
  • 2.4.2 图文对的扰动增强
    • 图像变换: 对图像进行保持语义的轻微变换,如随机裁剪、旋转、颜色抖动、添加噪声等。这能提升模型对图像扰动的鲁棒性。
    • 局部替换: 在图像中遮盖非关键物体区域,并修改文本描述以匹配遮盖后的内容(或反之),用于增强细粒度理解。

数据增强在扩大数据集的同时,也可能引入重复或近似重复的样本,这引出了下一个关键步骤——去重。


三、 数据去重:提升效率与防止记忆

重复数据不仅浪费计算资源,还可能导致模型对高频模式产生过拟合,损害其泛化能力,并可能在生成时输出训练数据中的原文,引发隐私和版权问题。

3.1 精确去重

处理完全相同的样本。

  • 3.1.1 基于哈希的方法: 为每个样本的instruction+input+output拼接字符串计算哈希值(如MD5SHA-256)。哈希值相同的样本视为完全重复,仅保留一份。这是最高效的去重方法,可剔除因收集错误导致的完全重复。
  • 3.1.2 基于字段匹配: 针对特定字段(如仅对instruction字段)进行精确匹配去重。适用于instruction是唯一标识的场景(如问答对)。

3.2 近似去重

处理语义相似或高度重叠的样本,是去重的核心和重点。

  • 3.2.1 MinHash + LSH(局部敏感哈希): 这是处理海量数据近似去重的行业标准。首先,将文本表示为n-gram(如5-gram)集合,然后通过MinHash算法将每个集合压缩成一个固定大小的签名(signature)。LSH则能快速地将签名相似的样本哈希到相同的“桶”中,从而只需在桶内进行精细比较。此方法在大规模去重中效率极高。
  • 3.2.2 编辑距离: 适用于短文本(如句子)的字符或词级别相似度计算。Levenshtein距离计算将一个字符串转换为另一个所需的最少单字符编辑次数。Jaro-Winkler距离对前缀相同的字符串给予更高相似度。设定一个编辑距离阈值(如,长度50的文本,编辑距离<5)来判断是否重复。
  • 3.2.3 SimHash: Google用于网页去重的算法。它将文本哈希为一个固定长度的指纹(如64位整数),其特性是相似文本的指纹只有少量不同位。通过计算汉明距离(Hamming Distance)即可快速判断相似度,效率极高,尤其适合长文档。
  • 3.2.4 语义向量相似度: 使用句子编码模型(如Sentence-BERT, SimCSE, BGE)将文本转换为高维语义向量,然后计算向量间的余弦相似度。这种方法能捕捉语义上的重复,即使措辞不同。例如,“猫坐在垫子上”和“一只猫咪在毯子上”的语义相似度会很高。设定一个相似度阈值(如>0.9)进行去重。计算成本较高,常用于对MinHash筛选后的候选对进行精筛。
  • 3.2.5 聚类去重: 先对所有样本的语义向量进行聚类(如使用K-means或基于密度的DBSCAN)。然后,在每个簇内,可以只保留一个代表性样本(如最靠近簇中心的样本),或者对簇内所有样本进行融合/去重处理。这种方法能发现全局的冗余模式。

3.3 不同粒度的去重

  • 3.3.1 句子级去重: 在构建预训练语料时尤为重要,防止模型记忆重复的句子片段。通常使用MinHashn-gram级别进行操作。
  • 3.3.2 文档/样本级去重: 即对整个训练样本(指令-响应对)进行去重。这是SFT数据去重的主要粒度。
  • 3.3.3 数据集内部 vs 跨数据集去重
    • 内部去重: 在一个数据集中去除重复样本。
    • 跨数据集去重: 在合并多个来源的数据集(如Alpaca, ShareGPT, 自研数据)时,必须进行跨数据集去重,防止同一样本在不同数据集中出现,从而被过度加权。更重要的是,必须与模型的预训练数据进行去重,以防止“数据泄漏”——模型在微调时再次见到预训练时已见过的数据,这会导致对微调效果的虚假高估。

3.4 去重对模型性能的影响评估

去重不是越激进越好,需要权衡。

  • 3.4.1 保留多样性的去重策略: 过于严格的去重(如很高的语义相似度阈值)可能剔除有价值的表达变体,损害数据多样性。可以采用分层去重策略:先用MinHash等高效方法去除高度重复项,再用较宽松的语义相似度阈值去除语义冗余,但保留表达上的差异。
  • 3.4.2 去重比例与下游任务精度的权衡: 必须进行消融实验。比较不同程度去重(如去重0%、5%、20%、50%的数据)后,模型在保留的验证集未见过的测试任务上的性能。目标是找到一个“甜蜜点”,在显著减少数据量、提升训练效率的同时,模型性能不下降甚至略有提升(因为去除了噪声和冗余)。

在数据极其稀缺或获取成本极高的领域,仅靠清洗、增强和去重可能仍无法获得足够数据。此时,需要借助合成数据生成。


四、 合成数据生成:当真实数据不够用时

利用大模型自身或更强的模型来创造训练数据,已成为解决数据瓶颈的关键技术,尤其是在专业领域。

4.1 主流合成数据生成方法

  • 4.1.1 基于种子示例的生成: 提供少量高质量的种子示例(few-shot examples),提示大模型按照给定的格式和风格生成新的类似数据。这是最简单直接的方法。
  • 4.1.2 Self-Instruct: 由Stanford团队提出的经典方法。其核心是引导一个大模型(最初是GPT-3)从一组基础指令种子出发,通过“生成指令 -> 生成输入(可选)-> 生成输出”的步骤,自动扩展指令数据集。该方法打开了指令数据自动化生成的大门。
  • 4.1.3 Evol-Instruct: 在Self-Instruct基础上,通过让大模型对现有指令进行“演化”操作来提升数据复杂度。演化操作包括:深化(增加约束、增加推理步骤)、具体化(使问题更具体)、增加推理步骤复杂化输入等。通过多轮演化,可以生成难度远高于初始种子的高质量数据,WizardLM系列模型即基于此方法。
  • 4.1.4 基于知识图谱的推理路径生成: 在需要事实性、逻辑性的领域(如医疗、金融),可以从结构化知识库(知识图谱)中抽取实体和关系,自动生成(问题, 推理链, 答案)形式的数据。例如,从(药物A, 治疗, 疾病B)(疾病B, 症状, 咳嗽)可以生成问题“药物A可以用来治疗咳嗽吗?”,并附上推理链。
  • 4.1.5 多模型协作生成
    • 生成-判别-过滤流程: 一个“生成器”模型(如GPT-4)负责创造候选数据;一个“判别器”模型(可以是另一个较小的模型,或一组规则/启发式方法)负责评估候选数据的质量(相关性、正确性、安全性);只有通过判别的数据才会被加入训练集。
    • 批评-修订流程: 生成器生成数据后,另一个“批评器”模型指出其不足,生成器再根据批评进行修订,循环多次以提高质量。

4.2 合成数据的质量控制

合成数据的质量是其有效性的生命线。

  • 4.2.1 多样性控制: 在调用生成模型时,通过调节temperaturetop-pfrequency_penalty等生成参数,鼓励输出多样性。可以结合不同的生成提示模板,从多个角度生成数据。
  • 4.2.2 真实性与正确性验证
    • 自动事实核查: 对于涉及事实的合成数据,可利用检索增强生成(RAG)技术。生成答案时,模型同时引用相关来源(如维基百科片段),或事后用一个检索系统验证生成内容与可靠来源的一致性。
    • 自洽性检查: 对于数学、代码、逻辑问题,可以执行代码或逻辑推导来验证答案的正确性。
    • 人工抽样审计: 必须定期对合成数据进行人工抽样检查,评估其事实准确性、逻辑性和有用性,并根据检查结果调整生成提示或过滤规则。
  • 4.2.3 避免模式坍塌与重复生成: 大模型在生成大量数据时,容易陷入重复的、模板化的模式。可以通过检测生成数据的n-gram重复率、语义嵌入的聚类中心密度等指标来监控模式坍塌,并及时调整生成策略(如使用更丰富的种子、增加随机性)。
  • 4.2.4 对抗性过滤: 使用一个在高质量人类标注数据上训练的“鉴别器”模型(分类器),对合成数据进行打分,过滤掉低分样本。甚至可以训练一个GAN式的生成器-判别器对,迭代提升合成数据的质量。

4.3 合成数据与真实数据的混合策略

如何混合使用合成数据与真实数据,是影响最终效果的关键。

  • 4.3.1 比例寻优实验: 没有固定黄金比例。需要在验证集上实验不同混合比例(如100%真实、70%真实+30%合成、50%/50%、30%真实+70%合成等)对模型性能的影响。通常,加入一定比例的高质量合成数据能提升性能,但合成数据比例过高可能导致性能饱和甚至下降。
  • 4.3.2 渐进式混合
    • 课程学习: 先使用较简单的、高质量的真实数据训练模型,待模型具备一定基础能力后,再引入更复杂、更具挑战性的合成数据进行训练。
    • 交替训练: 在训练过程中,交替使用真实数据批次和合成数据批次,有助于模型同时学习真实分布和泛化模式。
  • 4.3.3 困难样本挖掘 + 合成增强: 在模型训练过程中,识别出模型预测错误的样本(困难样本)。针对这些样本所属的类型或主题,有针对性地合成更多类似的、具有挑战性的数据,并将其加入训练集,实现“对症下药”式的增强。

4.4 合成数据的潜在风险与缓解措施

  • 4.4.1 模型偏见放大: 生成模型会继承并放大其训练数据中的社会偏见。如果用于生成数据的种子或提示存在偏见,生成的合成数据会进一步放大它。缓解: 对生成数据进行偏见评估(使用HONEST等指标),并对识别出的偏见数据进行降采样或重加权。在生成提示中明确要求“生成公平、中立的内容”。
  • 4.4.2 幻觉内容引入: 生成模型会编造看似合理但错误的事实。这是合成数据的最大风险之一。缓解: 严格实施4.2.2中的事实核查流程。优先采用基于知识图谱或检索增强的生成方法。在合成数据中明确区分“事实性知识”和“创造性内容”。
  • 4.4.3 合成数据循环退化: 如果持续使用模型自己生成的数据来训练其后续版本(或同类模型),可能导致模型分布逐渐收缩,性能退化,陷入“模型崩溃”或“退化循环”。缓解: 始终坚持在训练数据中保持一定比例的、新鲜的、高质量的人类真实数据。定期用外部基准和人类评估来监控模型性能,而非仅依赖合成数据驱动的评估。

五、常用开源数据清洗工具对比

5.1 DataJuicer (阿里)

一个功能强大、可配置性高的大模型数据预处理工具包。它将各种清洗、过滤、去重、分析算子(operator)模块化,用户可以通过配置文件像搭积木一样组合数据处理流水线。支持分布式处理,非常适用于处理TB级数据。推荐用于大规模、端到端的数据处理

5.2 TextSphere (腾讯)

同样是一个面向大模型的数据处理系统,提供丰富的文本处理工具和可视化界面。

Pandas

Python数据分析的瑞士军刀。在数据清洗的早期探索阶段、对小规模数据进行快速的原型验证和脚本编写时不可或缺。但对于大规模(GB级以上)数据,其内存和性能可能成为瓶颈。

PySpark

适用于超大规模数据集的分布式处理。当数据量达到PB级别,需要在Hadoop或云上集群运行时,PySpark是标准选择。但其开发复杂度高于单机工具。

六、微调数据质量评估指标

6.1 IFD (Instruction Following Density)

一种评估指令数据质量的自动化指标。其核心思想是,一个好的指令-响应对,其响应应紧密依赖指令中的信息。通过计算去除指令后对响应进行预测的困惑度增加程度来量化。IFD值越高,通常表示响应与指令的关联性越强,数据质量可能越高。

6.2 指令复杂度分数 (Instruction Complexity Score)

可以通过一些启发式规则来计算,如:指令的长度、包含的约束条件数量(如“用Python写”、“不超过100字”、“以表格形式列出”)、是否包含多步骤推理关键词(如“首先”、“然后”、“因此”)等。高复杂度的指令数据有助于训练出能力更强的模型。

七、实际案例:从原始日志到微调数据集的完整流程

假设我们要为一个“客服对话摘要”任务微调模型。

7.1 原始数据

从客服系统导出的JSON格式对话日志,包含用户和客服的多轮对话记录,以及一个手动填写的“摘要”字段(部分为空或质量差)。

7.2 清洗

  • 格式标准化: 将所有日志转换为标准JSONL,字段包括:dialog(对话历史列表), summary(摘要)。
  • 噪声过滤: 过滤掉summary为空的记录;过滤掉对话过短(少于3轮)或过长(超过20轮)的记录;使用语言识别过滤非目标语言的对话。
  • 异常值处理: 计算summary长度与dialog长度的比例,剔除比例异常高(如摘要比对话还长)或异常低(如只有一个词)的样本。
  • 标签质量校验: 因为summary是标签,我们使用ROUGE-LBERTScore自动评估每个摘要与对话的关联质量,对低分样本进行人工复核和重写。
  • 文本清洗: 去除对话中的特殊系统标记、统一换行符。

7.3 增强

  • 基于规则: 对高质量的(dialog, summary)对,通过同义词替换和回译,为相同的对话生成2-3个不同表述的摘要。
  • 基于模型: 使用GPT-4,以“请用另一种方式总结以下对话:{dialog}”为提示,生成更多样化的摘要。
  • 指令化: 将样本转换为指令格式:instruction: “请总结以下的客服对话:”, input: {dialog}, output: {summary}。并对指令进行多样性扩增(如“概括对话核心问题”、“为用户咨询提炼要点”)。

7.4 去重

  • dialog字段计算SimHash,去除高度相似的重复对话。
  • instruction + dialog的拼接文本上使用MinHash+LSH进行近似去重,去除语义和结构高度相似的样本。

7.5 合成数据生成(可选)

  • 从已有高质量数据中,抽取dialog的结构化信息(如用户问题类型、解决方案、涉及产品),利用模板或GPT-4生成新的、虚构但合理的客服对话及摘要。

7.6 最终输出

一个高质量的、指令格式的客服对话摘要微调数据集,可用于训练LlamaQwen等模型,使其具备专业的对话摘要能力。

通过以上系统性的工程流程,我们便能将原始的、粗糙的数据,转化为驱动大模型精准微调的优质“燃料”,为构建高性能、高可靠的领域大模型应用奠定坚实的数据基石。

7.7 代码实现

下面是一个完整的、可运行的Python代码示例,展示如何从原始客服日志构建高质量的微调数据集。代码包含数据加载、清洗、增强、去重和保存的完整流程。

"""
AI大模型微调数据收集和清洗完整流程实现
本代码展示了从原始客服日志到微调数据集的完整处理流程
包含:数据清洗、增强、去重、格式转换等步骤
作者: Thomas.Sir
日期:2026年4月
"""

import json
import pandas as pd
import numpy as np
from pathlib import Path
import re
import hashlib
import langdetect
from langdetect import detect, DetectorFactory
from tqdm import tqdm
import random
from collections import defaultdict
import logging
from typing import List, Dict, Tuple, Any, Optional
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子确保可重复性
random.seed(42)
np.random.seed(42)

# 设置logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 设置langdetect的确定性行为
DetectorFactory.seed = 42

class DataCleaner:
    """数据清洗类,处理原始数据的各种质量问题"""
    
    def __init__(self, min_dialog_turns=3, max_dialog_turns=20, min_summary_words=5):
        """
        初始化数据清洗器
        
        参数:
            min_dialog_turns: 最小对话轮数
            max_dialog_turns: 最大对话轮数
            min_summary_words: 摘要最小词数
        """
        self.min_dialog_turns = min_dialog_turns
        self.max_dialog_turns = max_dialog_turns
        self.min_summary_words = min_summary_words
        
        # 编译常用正则表达式
        self.html_pattern = re.compile(r'<[^>]+>')
        self.url_pattern = re.compile(r'https?://\S+|www\.\S+')
        self.email_pattern = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
        self.phone_pattern = re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b')
        self.repeated_chars_pattern = re.compile(r'(.)\1{5,}')  # 重复6次以上的字符
        self.control_chars_pattern = re.compile(r'[\x00-\x1f\x7f-\x9f]')
        
    def load_raw_data(self, file_path: str) -> List[Dict]:
        """
        加载原始JSON格式的数据
        
        参数:
            file_path: 原始数据文件路径
            
        返回:
            raw_data: 原始数据列表
        """
        logger.info(f"正在加载原始数据: {file_path}")
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                raw_data = json.load(f)
            logger.info(f"成功加载 {len(raw_data)} 条原始记录")
            return raw_data
        except Exception as e:
            logger.error(f"加载数据失败: {e}")
            return []
    
    def format_standardization(self, raw_data: List[Dict]) -> List[Dict]:
        """
        数据格式标准化
        
        参数:
            raw_data: 原始数据
            
        返回:
            standardized_data: 标准化后的数据
        """
        logger.info("开始数据格式标准化")
        standardized_data = []
        skipped_count = 0
        
        for item in tqdm(raw_data, desc="格式标准化"):
            standardized_item = {}
            
            # 1.1.1 统一字段名
            # 处理不同可能的字段名
            if 'dialog' in item:
                standardized_item['dialog'] = item['dialog']
            elif 'conversation' in item:
                standardized_item['dialog'] = item['conversation']
            elif 'messages' in item:
                standardized_item['dialog'] = item['messages']
            else:
                # 尝试从其他常见字段中提取对话
                for key, value in item.items():
                    if isinstance(value, list) and len(value) > 0:
                        if isinstance(value[0], str) or (isinstance(value[0], dict) and 'content' in value[0]):
                            standardized_item['dialog'] = value
                            break
                
                if 'dialog' not in standardized_item:
                    skipped_count += 1
                    continue
            
            # 1.1.2 字段校验与缺失字段补全
            # 确保dialog是列表格式
            if not isinstance(standardized_item['dialog'], list):
                standardized_item['dialog'] = [standardized_item['dialog']]
            
            # 处理summary字段
            if 'summary' in item and item['summary']:
                standardized_item['summary'] = item['summary']
            elif 'answer' in item and item['answer']:
                standardized_item['summary'] = item['answer']
            elif 'response' in item and item['response']:
                standardized_item['summary'] = item['response']
            else:
                standardized_item['summary'] = ""
            
            # 添加唯一ID
            standardized_item['id'] = hashlib.md5(
                json.dumps(standardized_item['dialog'], ensure_ascii=False).encode('utf-8')
            ).hexdigest()[:16]
            
            # 添加原始数据来源标识
            standardized_item['source'] = item.get('source', 'unknown')
            
            standardized_data.append(standardized_item)
        
        logger.info(f"格式标准化完成: 处理 {len(standardized_data)} 条, 跳过 {skipped_count} 条")
        return standardized_data
    
    def clean_dialog_text(self, text: str) -> str:
        """
        清理单条对话文本
        
        参数:
            text: 原始文本
            
        返回:
            cleaned_text: 清理后的文本
        """
        if not isinstance(text, str):
            return ""
        
        # 1.6.1 移除HTML/XML标签
        text = self.html_pattern.sub(' ', text)
        
        # 1.6.2 移除URL
        text = self.url_pattern.sub(' [URL] ', text)
        
        # 1.6.2 移除邮箱
        text = self.email_pattern.sub(' [EMAIL] ', text)
        
        # 1.6.2 移除电话
        text = self.phone_pattern.sub(' [PHONE] ', text)
        
        # 1.2.1 移除过多的重复字符
        text = self.repeated_chars_pattern.sub(r'\1\1\1', text)  # 最多保留3个重复字符
        
        # 1.6.2 移除控制字符
        text = self.control_chars_pattern.sub(' ', text)
        
        # 1.6.3 统一空白字符
        # 替换各种空白字符为普通空格
        text = re.sub(r'[\t\r\f\v]', ' ', text)
        # 合并多个连续空格为单个空格
        text = re.sub(r' +', ' ', text)
        # 移除首尾空格
        text = text.strip()
        
        return text
    
    def noise_filtering(self, data: List[Dict]) -> List[Dict]:
        """
        噪声过滤
        
        参数:
            data: 标准化后的数据
            
        返回:
            filtered_data: 过滤后的数据
        """
        logger.info("开始噪声过滤")
        filtered_data = []
        stats = {
            'total': len(data),
            'no_dialog': 0,
            'too_short': 0,
            'too_long': 0,
            'no_summary': 0,
            'short_summary': 0,
            'wrong_language': 0,
            'cleaned': 0
        }
        
        for item in tqdm(data, desc="噪声过滤"):
            # 检查对话是否存在
            if 'dialog' not in item or not item['dialog']:
                stats['no_dialog'] += 1
                continue
            
            # 1.2.2 过滤过长/过短对话
            dialog_turns = len(item['dialog'])
            if dialog_turns < self.min_dialog_turns:
                stats['too_short'] += 1
                continue
            
            if dialog_turns > self.max_dialog_turns:
                stats['too_long'] += 1
                # 可以选择截断而不是完全丢弃
                item['dialog'] = item['dialog'][:self.max_dialog_turns]
            
            # 清理每条对话文本
            cleaned_dialog = []
            for turn in item['dialog']:
                if isinstance(turn, dict):
                    # 处理格式化的对话 {"role": "user", "content": "..."}
                    if 'content' in turn:
                        cleaned_content = self.clean_dialog_text(turn['content'])
                        if cleaned_content:
                            cleaned_turn = turn.copy()
                            cleaned_turn['content'] = cleaned_content
                            cleaned_dialog.append(cleaned_turn)
                elif isinstance(turn, str):
                    cleaned_turn = self.clean_dialog_text(turn)
                    if cleaned_turn:
                        # 如果是字符串格式,转换为标准格式
                        if cleaned_turn.startswith(('用户:', '顾客:', '客户:', 'user:', 'User:')):
                            role = 'user'
                            content = cleaned_turn.split(':', 1)[1].strip()
                        elif cleaned_turn.startswith(('客服:', '客服人员:', '客服代表:', 'assistant:', 'Assistant:')):
                            role = 'assistant'
                            content = cleaned_turn.split(':', 1)[1].strip()
                        else:
                            # 默认格式,尝试推断角色
                            if len(cleaned_dialog) % 2 == 0:
                                role = 'user'
                            else:
                                role = 'assistant'
                            content = cleaned_turn
                        
                        cleaned_dialog.append({'role': role, 'content': content})
            
            if len(cleaned_dialog) < 2:  # 至少需要一轮对话
                stats['too_short'] += 1
                continue
            
            item['dialog'] = cleaned_dialog
            
            # 检查摘要
            if not item.get('summary'):
                stats['no_summary'] += 1
                continue
            
            # 清理摘要
            cleaned_summary = self.clean_dialog_text(item['summary'])
            if len(cleaned_summary.split()) < self.min_summary_words:
                stats['short_summary'] += 1
                continue
            
            item['summary'] = cleaned_summary
            
            # 1.2.3 语言识别与过滤
            # 拼接对话文本进行语言检测
            dialog_text = ' '.join([turn['content'] for turn in item['dialog']])
            try:
                # 检测语言
                lang = detect(dialog_text)
                # 如果是中文对话,过滤非中文内容
                if 'zh' not in lang and 'cn' not in lang:
                    # 但摘要可能是中文,所以也检查摘要
                    summary_lang = detect(item['summary'])
                    if 'zh' not in summary_lang and 'cn' not in summary_lang:
                        stats['wrong_language'] += 1
                        continue
            except:
                # 语言检测失败,默认保留
                pass
            
            filtered_data.append(item)
            stats['cleaned'] += 1
        
        logger.info(f"噪声过滤完成: 原始 {stats['total']} 条, 保留 {stats['cleaned']} 条")
        logger.info(f"过滤统计: 无对话 {stats['no_dialog']}, 对话过短 {stats['too_short']}, 对话过长 {stats['too_long']}, "
                   f"无摘要 {stats['no_summary']}, 摘要过短 {stats['short_summary']}, 语言不符 {stats['wrong_language']}")
        
        return filtered_data
    
    def remove_duplicates(self, data: List[Dict]) -> List[Dict]:
        """
        移除重复数据 (精确去重)
        
        参数:
            data: 过滤后的数据
            
        返回:
            deduplicated_data: 去重后的数据
        """
        logger.info("开始精确去重")
        
        seen_hashes = set()
        deduplicated_data = []
        duplicate_count = 0
        
        for item in tqdm(data, desc="去重处理"):
            # 创建对话文本的哈希键
            dialog_text = ''.join([turn['content'] for turn in item['dialog']])
            dialog_hash = hashlib.md5(dialog_text.encode('utf-8')).hexdigest()
            
            # 检查是否已存在
            if dialog_hash not in seen_hashes:
                seen_hashes.add(dialog_hash)
                deduplicated_data.append(item)
            else:
                duplicate_count += 1
        
        logger.info(f"去重完成: 原始 {len(data)} 条, 去重后 {len(deduplicated_data)} 条, 移除 {duplicate_count} 条重复数据")
        
        return deduplicated_data

class DataAugmentor:
    """数据增强类,用于扩增训练数据"""
    
    def __init__(self, augmentation_methods=None):
        """
        初始化数据增强器
        
        参数:
            augmentation_methods: 启用的增强方法列表
        """
        if augmentation_methods is None:
            augmentation_methods = ['paraphrase', 'back_translate_simple', 'instruction_variation']
        
        self.augmentation_methods = augmentation_methods
        
        # 同义词词典 (简易版,实际应用中可以使用更完整的词典)
        self.synonyms = {
            "问题": ["疑问", "难题", "麻烦", "困难"],
            "解决": ["处理", "应对", "搞定", "化解"],
            "帮助": ["协助", "支援", "帮忙", "扶助"],
            "感谢": ["谢谢", "感激", "致谢", "道谢"],
            "抱歉": ["对不起", "不好意思", "歉意", "道歉"],
            "明白": ["理解", "懂得", "清楚", "知晓"],
            "请问": ["请教", "询问", "咨询", "问一下"],
            "产品": ["商品", "物品", "货品", "制品"],
            "服务": ["帮助", "支持", "协助", "服务"],
            "购买": ["买入", "购置", "采购", "买下"],
        }
    
    def paraphrase_with_synonyms(self, text: str, replacement_prob=0.3) -> str:
        """
        通过同义词替换进行复述
        
        参数:
            text: 原始文本
            replacement_prob: 替换概率
            
        返回:
            paraphrased_text: 复述后的文本
        """
        words = text.split()
        paraphrased_words = []
        
        for word in words:
            # 清洗单词,移除标点
            clean_word = re.sub(r'[^\w]', '', word)
            
            # 检查是否有同义词
            if clean_word in self.synonyms and random.random() < replacement_prob:
                synonyms = self.synonyms[clean_word]
                new_word = random.choice(synonyms)
                
                # 保持原单词的大小写和标点
                if word.istitle():
                    new_word = new_word.title()
                elif word.isupper():
                    new_word = new_word.upper()
                
                # 恢复标点
                if word != clean_word:
                    punctuation = word[len(clean_word):]
                    new_word = new_word + punctuation
                
                paraphrased_words.append(new_word)
            else:
                paraphrased_words.append(word)
        
        return ' '.join(paraphrased_words)
    
    def simple_back_translate(self, text: str) -> str:
        """
        简单的回译模拟 (实际应用中应调用翻译API)
        
        参数:
            text: 原始文本
            
        返回:
            back_translated_text: 回译后的文本
        """
        # 在实际应用中,这里应该调用翻译API
        # 如: 中文->英文->中文
        # 这里使用简单的规则模拟回译效果
        
        # 模拟翻译变化
        replacements = [
            ("客服", "客户服务"),
            ("用户", "客户"),
            ("问题", "疑问"),
            ("解决", "处理"),
            ("感谢", "谢谢"),
            ("请问", "想问一下"),
            ("如何", "怎样"),
            ("因为", "由于"),
            ("所以", "因此"),
            ("但是", "不过"),
        ]
        
        back_translated = text
        for old, new in replacements:
            if random.random() < 0.5:  # 50%概率应用每个替换
                back_translated = back_translated.replace(old, new)
        
        return back_translated
    
    def generate_instruction_variations(self, instruction: str) -> List[str]:
        """
        生成指令的多种变体
        
        参数:
            instruction: 原始指令
            
        返回:
            variations: 指令变体列表
        """
        variations = []
        
        # 基础指令模板
        base_instructions = [
            "请总结以下对话:",
            "请概括以下对话内容:",
            "请为以下对话生成摘要:",
            "请提炼以下对话的核心内容:",
            "请用一句话总结以下对话:",
            "请简要概述以下对话:",
        ]
        
        # 添加角色变换
        role_variations = [
            "作为客服主管,请总结以下对话:",
            "从客户的角度,总结以下对话:",
            "以专业客服的身份,概括以下对话:",
        ]
        
        # 添加详细程度变化
        detail_variations = [
            "请详细总结以下对话,包括问题描述和解决方案:",
            "请简洁地总结以下对话,突出关键点:",
            "请全面总结以下对话,涵盖问题、解决过程和结果:",
        ]
        
        # 从各种变体中选择
        all_variations = base_instructions + role_variations + detail_variations
        
        # 选择3-5个变体
        selected_variations = random.sample(all_variations, min(3, len(all_variations)))
        
        return selected_variations
    
    def augment_dialog(self, dialog: List[Dict], method: str) -> List[Dict]:
        """
        增强对话数据
        
        参数:
            dialog: 原始对话
            method: 增强方法
            
        返回:
            augmented_dialog: 增强后的对话
        """
        if method == 'paraphrase':
            # 对对话内容进行同义词替换
            augmented = []
            for turn in dialog:
                if turn['role'] == 'user':
                    # 对用户消息进行增强
                    augmented_turn = turn.copy()
                    augmented_turn['content'] = self.paraphrase_with_synonyms(turn['content'])
                    augmented.append(augmented_turn)
                else:
                    augmented.append(turn.copy())
            return augmented
        
        elif method == 'back_translate_simple':
            # 简单的回译模拟
            augmented = []
            for turn in dialog:
                augmented_turn = turn.copy()
                augmented_turn['content'] = self.simple_back_translate(turn['content'])
                augmented.append(augmented_turn)
            return augmented
        
        else:
            # 默认返回原始对话的副本
            return [turn.copy() for turn in dialog]
    
    def augment_summary(self, summary: str, method: str) -> str:
        """
        增强摘要数据
        
        参数:
            summary: 原始摘要
            method: 增强方法
            
        返回:
            augmented_summary: 增强后的摘要
        """
        if method == 'paraphrase':
            return self.paraphrase_with_synonyms(summary)
        
        elif method == 'back_translate_simple':
            return self.simple_back_translate(summary)
        
        else:
            return summary
    
    def augment_data(self, data: List[Dict], samples_per_item=2) -> List[Dict]:
        """
        对数据进行增强
        
        参数:
            data: 原始数据
            samples_per_item: 每个样本增强的数量
            
        返回:
            augmented_data: 增强后的数据
        """
        logger.info(f"开始数据增强,每个样本生成 {samples_per_item} 个增强样本")
        
        augmented_data = []
        
        for i, item in enumerate(tqdm(data, desc="数据增强")):
            # 添加原始数据
            augmented_data.append(item.copy())
            
            # 为每个样本生成多个增强版本
            for aug_idx in range(samples_per_item):
                # 随机选择增强方法
                method = random.choice(self.augmentation_methods)
                
                # 增强对话
                augmented_dialog = self.augment_dialog(item['dialog'], method)
                
                # 增强摘要
                augmented_summary = self.augment_summary(item['summary'], method)
                
                # 创建增强后的数据项
                augmented_item = {
                    'id': f"{item['id']}_aug{aug_idx}",
                    'dialog': augmented_dialog,
                    'summary': augmented_summary,
                    'source': f"{item.get('source', 'unknown')}_augmented",
                    'augmentation_method': method
                }
                
                augmented_data.append(augmented_item)
        
        logger.info(f"数据增强完成: 原始 {len(data)} 条, 增强后 {len(augmented_data)} 条")
        
        return augmented_data

class InstructionFormatter:
    """指令格式化类,将对话数据转换为指令微调格式"""
    
    def __init__(self, instruction_templates=None):
        """
        初始化指令格式化器
        
        参数:
            instruction_templates: 指令模板列表
        """
        if instruction_templates is None:
            instruction_templates = [
                "请总结以下对话:",
                "请概括以下对话内容:",
                "请为以下对话生成摘要:",
                "请提炼以下对话的核心内容:",
                "请用一段话总结以下对话:",
                "请简要概述以下对话的关键信息:",
            ]
        
        self.instruction_templates = instruction_templates
    
    def format_dialog_text(self, dialog: List[Dict]) -> str:
        """
        将对话列表格式化为文本
        
        参数:
            dialog: 对话列表
            
        返回:
            formatted_text: 格式化后的对话文本
        """
        formatted_lines = []
        
        for i, turn in enumerate(dialog):
            role = turn.get('role', 'unknown')
            content = turn.get('content', '')
            
            if role == 'user':
                formatted_lines.append(f"用户: {content}")
            elif role == 'assistant':
                formatted_lines.append(f"客服: {content}")
            else:
                formatted_lines.append(f"{role}: {content}")
        
        return '\n'.join(formatted_lines)
    
    def convert_to_instruction_format(self, data: List[Dict], 
                                     use_multiple_instructions=False) -> List[Dict]:
        """
        将对话数据转换为指令微调格式
        
        参数:
            data: 对话数据
            use_multiple_instructions: 是否为每个样本使用多个指令变体
            
        返回:
            formatted_data: 格式化后的指令数据
        """
        logger.info("开始转换为指令格式")
        
        formatted_data = []
        
        for item in tqdm(data, desc="格式化转换"):
            # 格式化对话
            dialog_text = self.format_dialog_text(item['dialog'])
            
            # 获取摘要
            summary = item.get('summary', '')
            
            if not summary:
                continue
            
            if use_multiple_instructions and 'augmentation_method' not in item:
                # 为原始数据生成多个指令变体
                instruction_variations = random.sample(
                    self.instruction_templates, 
                    min(3, len(self.instruction_templates))
                )
                
                for instruction in instruction_variations:
                    formatted_item = {
                        'instruction': instruction,
                        'input': dialog_text,
                        'output': summary,
                        'id': f"{item['id']}_{hashlib.md5(instruction.encode()).hexdigest()[:8]}",
                        'source': item.get('source', 'unknown'),
                        'dialog_length': len(item['dialog']),
                        'summary_length': len(summary.split())
                    }
                    
                    # 添加增强方法信息(如果有)
                    if 'augmentation_method' in item:
                        formatted_item['augmentation_method'] = item['augmentation_method']
                    
                    formatted_data.append(formatted_item)
            else:
                # 使用随机指令模板
                instruction = random.choice(self.instruction_templates)
                
                formatted_item = {
                    'instruction': instruction,
                    'input': dialog_text,
                    'output': summary,
                    'id': item['id'],
                    'source': item.get('source', 'unknown'),
                    'dialog_length': len(item['dialog']),
                    'summary_length': len(summary.split())
                }
                
                # 添加增强方法信息(如果有)
                if 'augmentation_method' in item:
                    formatted_item['augmentation_method'] = item['augmentation_method']
                
                formatted_data.append(formatted_item)
        
        logger.info(f"指令格式化完成: 生成 {len(formatted_data)} 条指令数据")
        
        return formatted_data

class DataQualityChecker:
    """数据质量检查类,用于评估和确保数据质量"""
    
    def __init__(self):
        """初始化数据质量检查器"""
        self.quality_metrics = {}
    
    def calculate_statistics(self, data: List[Dict]) -> Dict[str, Any]:
        """
        计算数据集统计信息
        
        参数:
            data: 数据集
            
        返回:
            stats: 统计信息字典
        """
        logger.info("计算数据统计信息")
        
        stats = {
            'total_samples': len(data),
            'instruction_lengths': [],
            'input_lengths': [],
            'output_lengths': [],
            'sources': defaultdict(int),
            'augmentation_methods': defaultdict(int)
        }
        
        for item in data:
            # 统计长度
            stats['instruction_lengths'].append(len(item.get('instruction', '').split()))
            stats['input_lengths'].append(len(item.get('input', '').split()))
            stats['output_lengths'].append(len(item.get('output', '').split()))
            
            # 统计来源
            stats['sources'][item.get('source', 'unknown')] += 1
            
            # 统计增强方法
            if 'augmentation_method' in item:
                stats['augmentation_methods'][item['augmentation_method']] += 1
        
        # 计算统计量
        for length_type in ['instruction_lengths', 'input_lengths', 'output_lengths']:
            lengths = stats[length_type]
            if lengths:
                stats[f'{length_type}_mean'] = np.mean(lengths)
                stats[f'{length_type}_std'] = np.std(lengths)
                stats[f'{length_type}_min'] = np.min(lengths)
                stats[f'{length_type}_max'] = np.max(lengths)
                stats[f'{length_type}_median'] = np.median(lengths)
            else:
                stats[f'{length_type}_mean'] = 0
                stats[f'{length_type}_std'] = 0
                stats[f'{length_type}_min'] = 0
                stats[f'{length_type}_max'] = 0
                stats[f'{length_type}_median'] = 0
        
        return stats
    
    def check_quality_issues(self, data: List[Dict]) -> Dict[str, List[Dict]]:
        """
        检查数据质量问题
        
        参数:
            data: 数据集
            
        返回:
            issues: 问题数据字典
        """
        logger.info("检查数据质量问题")
        
        issues = {
            'empty_input': [],
            'empty_output': [],
            'short_output': [],
            'identical_input_output': [],
            'repeated_phrases': []
        }
        
        for item in data:
            instruction = item.get('instruction', '')
            input_text = item.get('input', '')
            output = item.get('output', '')
            
            # 检查空输入
            if not input_text.strip():
                issues['empty_input'].append(item)
            
            # 检查空输出
            if not output.strip():
                issues['empty_output'].append(item)
            
            # 检查过短的输出
            if len(output.strip().split()) < 3:
                issues['short_output'].append(item)
            
            # 检查输入输出相同
            if input_text.strip() and output.strip() and input_text == output:
                issues['identical_input_output'].append(item)
            
            # 检查重复短语
            words = output.split()
            if len(words) > 10:
                word_count = {}
                for word in words:
                    if len(word) > 2:  # 忽略短词
                        word_count[word] = word_count.get(word, 0) + 1
                
                # 检查是否有词重复超过3次
                for word, count in word_count.items():
                    if count > 3 and len(word) > 1:
                        issues['repeated_phrases'].append(item)
                        break
        
        logger.info(f"质量问题统计: 空输入 {len(issues['empty_input'])}, 空输出 {len(issues['empty_output'])}, "
                   f"输出过短 {len(issues['short_output'])}, 输入输出相同 {len(issues['identical_input_output'])}, "
                   f"重复短语 {len(issues['repeated_phrases'])}")
        
        return issues
    
    def generate_quality_report(self, data: List[Dict], output_path: str):
        """
        生成数据质量报告
        
        参数:
            data: 数据集
            output_path: 报告输出路径
        """
        logger.info("生成数据质量报告")
        
        # 计算统计信息
        stats = self.calculate_statistics(data)
        
        # 检查质量问题
        issues = self.check_quality_issues(data)
        
        # 生成报告
        report = {
            'overview': {
                'total_samples': stats['total_samples'],
                'sources_distribution': dict(stats['sources']),
                'augmentation_distribution': dict(stats['augmentation_methods'])
            },
            'length_statistics': {
                'instruction': {
                    'mean': stats['instruction_lengths_mean'],
                    'std': stats['instruction_lengths_std'],
                    'min': stats['instruction_lengths_min'],
                    'max': stats['instruction_lengths_max'],
                    'median': stats['instruction_lengths_median']
                },
                'input': {
                    'mean': stats['input_lengths_mean'],
                    'std': stats['input_lengths_std'],
                    'min': stats['input_lengths_min'],
                    'max': stats['input_lengths_max'],
                    'median': stats['input_lengths_median']
                },
                'output': {
                    'mean': stats['output_lengths_mean'],
                    'std': stats['output_lengths_std'],
                    'min': stats['output_lengths_min'],
                    'max': stats['output_lengths_max'],
                    'median': stats['output_lengths_median']
                }
            },
            'quality_issues': {
                'empty_input': len(issues['empty_input']),
                'empty_output': len(issues['empty_output']),
                'short_output': len(issues['short_output']),
                'identical_input_output': len(issues['identical_input_output']),
                'repeated_phrases': len(issues['repeated_phrases'])
            },
            'sample_data': data[:5] if data else []  # 包含前5个样本作为示例
        }
        
        # 保存报告
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        logger.info(f"数据质量报告已保存到: {output_path}")
        
        return report

def create_sample_data(num_samples=100):
    """
    创建示例数据用于测试
    
    参数:
        num_samples: 样本数量
        
    返回:
        sample_data: 示例数据
    """
    logger.info(f"创建 {num_samples} 个示例数据")
    
    sample_data = []
    
    # 常见客服对话主题
    topics = [
        "产品咨询", "订单查询", "退货退款", "技术支持", "账户问题", 
        "物流查询", "价格咨询", "售后服务", "使用问题", "投诉建议"
    ]
    
    # 常见用户问题
    user_questions = [
        "我买的商品什么时候能发货?",
        "这个产品有保修吗?保修期多久?",
        "我想退货,流程是什么?",
        "我的账号登录不上了,提示密码错误",
        "订单显示已签收,但我没有收到",
        "这个商品现在有优惠吗?",
        "怎么使用这个产品的XX功能?",
        "我要投诉你们的配送员,态度太差了",
        "产品有质量问题,能换货吗?",
        "发票怎么开具?"
    ]
    
    # 常见客服回复
    assistant_responses = [
        "您好,您的订单预计明天发货,我们会尽快为您处理。",
        "这款产品提供一年保修,从您收到商品开始计算。",
        "退货流程是:1.申请退货 2.等待审核 3.寄回商品 4.退款处理。",
        "您可以尝试重置密码,如果还有问题,请联系客服专员。",
        "请提供订单号,我们会联系物流公司核实。",
        "目前有满300减30的活动,您可以关注店铺首页。",
        "您可以在设置中找到XX功能,具体操作是...",
        "非常抱歉给您带来不好的体验,我们会记录并处理。",
        "可以提供一下产品照片吗?我们会为您处理换货。",
        "发票会在订单完成后7个工作日内开具,请留意邮箱。"
    ]
    
    # 摘要模板
    summary_templates = [
        "用户咨询{}问题,客服提供了{}的解决方案。",
        "关于{},用户遇到了{},客服建议{}。",
        "用户反映{},客服回应将{}。",
        "用户询问{},客服详细解释了{}。",
        "用户投诉{},客服道歉并表示会{}。"
    ]
    
    for i in range(num_samples):
        # 随机生成对话轮数 (2-8轮)
        num_turns = random.randint(2, 8)
        dialog = []
        
        # 确保对话以用户开始
        for turn_idx in range(num_turns):
            if turn_idx % 2 == 0:  # 用户轮次
                role = "user"
                # 第一个用户消息用完整问题,后续可能用简短回复
                if turn_idx == 0:
                    content = random.choice(user_questions)
                else:
                    content = random.choice(["好的", "明白了", "那然后呢?", "具体怎么操作?", "谢谢"])
            else:  # 客服轮次
                role = "assistant"
                content = random.choice(assistant_responses)
            
            dialog.append({
                "role": role,
                "content": content
            })
        
        # 生成摘要
        topic = random.choice(topics)
        summary = random.choice(summary_templates).format(topic, random.choice(["详细", "具体", "相应"]))
        
        # 创建数据项
        sample_data.append({
            "id": f"sample_{i}",
            "dialog": dialog,
            "summary": summary,
            "source": "synthetic"
        })
    
    # 添加一些有问题的数据用于测试清洗
    problem_samples = [
        {
            "id": "problem_1",
            "dialog": [],  # 空对话
            "summary": "测试摘要",
            "source": "synthetic"
        },
        {
            "id": "problem_2",
            "dialog": [{"role": "user", "content": "你好"}],  # 过短对话
            "summary": "测试",
            "source": "synthetic"
        },
        {
            "id": "problem_3",
            "dialog": [{"role": "user", "content": "Hello, I have a question."}],  # 英文
            "summary": "This is a test summary.",
            "source": "synthetic"
        },
        {
            "id": "problem_4",
            "dialog": [{"role": "user", "content": "测试<script>alert('xss')</script>"}],  # 包含HTML
            "summary": "测试摘要",
            "source": "synthetic"
        }
    ]
    
    sample_data.extend(problem_samples)
    
    return sample_data

def save_sample_data(data, file_path):
    """保存示例数据到文件"""
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    logger.info(f"示例数据已保存到: {file_path}")

def main():
    """主函数:完整的数据处理流程"""
    logger.info("=" * 60)
    logger.info("开始完整的数据处理流程")
    logger.info("=" * 60)
    
    # 1. 创建或加载原始数据
    raw_data_file = "raw_customer_service_data.json"
    
    # 如果数据文件不存在,创建示例数据
    if not Path(raw_data_file).exists():
        logger.info("原始数据文件不存在,创建示例数据...")
        raw_data = create_sample_data(50)  # 创建50个示例样本
        save_sample_data(raw_data, raw_data_file)
    else:
        logger.info(f"加载现有数据文件: {raw_data_file}")
        with open(raw_data_file, 'r', encoding='utf-8') as f:
            raw_data = json.load(f)
    
    # 2. 数据清洗
    logger.info("\n1. 数据清洗阶段")
    logger.info("-" * 40)
    
    cleaner = DataCleaner(
        min_dialog_turns=2,  # 最小对话轮数
        max_dialog_turns=10,  # 最大对话轮数
        min_summary_words=3   # 摘要最小词数
    )
    
    # 2.1 格式标准化
    standardized_data = cleaner.format_standardization(raw_data)
    
    # 2.2 噪声过滤
    cleaned_data = cleaner.noise_filtering(standardized_data)
    
    # 2.3 精确去重
    deduplicated_data = cleaner.remove_duplicates(cleaned_data)
    
    # 保存清洗后的数据
    cleaned_data_file = "cleaned_customer_service_data.json"
    with open(cleaned_data_file, 'w', encoding='utf-8') as f:
        json.dump(deduplicated_data, f, ensure_ascii=False, indent=2)
    logger.info(f"清洗后的数据已保存到: {cleaned_data_file}")
    
    # 3. 数据增强
    logger.info("\n2. 数据增强阶段")
    logger.info("-" * 40)
    
    augmentor = DataAugmentor(
        augmentation_methods=['paraphrase', 'back_translate_simple']
    )
    
    # 对清洗后的数据进行增强
    augmented_data = augmentor.augment_data(deduplicated_data, samples_per_item=1)
    
    # 保存增强后的数据
    augmented_data_file = "augmented_customer_service_data.json"
    with open(augmented_data_file, 'w', encoding='utf-8') as f:
        json.dump(augmented_data, f, ensure_ascii=False, indent=2)
    logger.info(f"增强后的数据已保存到: {augmented_data_file}")
    
    # 4. 转换为指令格式
    logger.info("\n3. 指令格式转换阶段")
    logger.info("-" * 40)
    
    formatter = InstructionFormatter()
    
    # 将数据转换为指令格式
    instruction_data = formatter.convert_to_instruction_format(
        augmented_data, 
        use_multiple_instructions=True
    )
    
    # 保存指令格式数据
    instruction_data_file = "instruction_formatted_data.json"
    with open(instruction_data_file, 'w', encoding='utf-8') as f:
        json.dump(instruction_data, f, ensure_ascii=False, indent=2)
    logger.info(f"指令格式数据已保存到: {instruction_data_file}")
    
    # 5. 数据质量检查
    logger.info("\n4. 数据质量检查阶段")
    logger.info("-" * 40)
    
    quality_checker = DataQualityChecker()
    
    # 生成质量报告
    quality_report = quality_checker.generate_quality_report(
        instruction_data, 
        "data_quality_report.json"
    )
    
    # 6. 保存为JSONL格式(用于微调)
    logger.info("\n5. 生成最终微调数据集")
    logger.info("-" * 40)
    
    # 将数据保存为JSONL格式
    jsonl_file = "customer_service_sft_data.jsonl"
    with open(jsonl_file, 'w', encoding='utf-8') as f:
        for item in instruction_data:
            # 创建标准格式
            sft_item = {
                "instruction": item["instruction"],
                "input": item["input"],
                "output": item["output"]
            }
            f.write(json.dumps(sft_item, ensure_ascii=False) + "\n")
    
    logger.info(f"最终微调数据(JSONL格式)已保存到: {jsonl_file}")
    logger.info(f"总共生成 {len(instruction_data)} 条训练样本")
    
    # 7. 数据统计
    logger.info("\n6. 数据统计")
    logger.info("-" * 40)
    
    # 读取JSONL文件并统计
    sample_count = 0
    with open(jsonl_file, 'r', encoding='utf-8') as f:
        for line in f:
            sample_count += 1
    
    logger.info(f"JSONL文件中的样本数量: {sample_count}")
    
    # 显示前3个样本
    logger.info("\n前3个样本示例:")
    with open(jsonl_file, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            if i < 3:
                sample = json.loads(line.strip())
                logger.info(f"\n样本 {i+1}:")
                logger.info(f"  指令: {sample['instruction'][:50]}...")
                logger.info(f"  输入长度: {len(sample['input'].split())} 词")
                logger.info(f"  输出长度: {len(sample['output'].split())} 词")
    
    logger.info("\n" + "=" * 60)
    logger.info("数据处理流程完成!")
    logger.info("=" * 60)
    
    return {
        "raw_data": raw_data_file,
        "cleaned_data": cleaned_data_file,
        "augmented_data": augmented_data_file,
        "instruction_data": instruction_data_file,
        "jsonl_data": jsonl_file,
        "quality_report": "data_quality_report.json",
        "total_samples": len(instruction_data)
    }

def test_pipeline():
    """测试数据处理流程的各个组件"""
    logger.info("开始测试数据处理流程...")
    
    # 创建测试数据
    test_data = [
        {
            "dialog": [
                {"role": "user", "content": "我买的手机什么时候能发货?"},
                {"role": "assistant", "content": "您的订单将在24小时内发货。"},
                {"role": "user", "content": "谢谢"},
                {"role": "assistant", "content": "不客气,有问题随时联系。"}
            ],
            "summary": "用户询问手机发货时间,客服回复24小时内发货。",
            "source": "test"
        },
        {
            "messages": [
                {"role": "customer", "content": "这个产品保修多久?"},
                {"role": "agent", "content": "保修期是一年。"}
            ],
            "answer": "用户咨询产品保修期,客服回复保修一年。"
        }
    ]
    
    # 测试数据清洗
    cleaner = DataCleaner()
    
    # 测试格式标准化
    logger.info("\n测试格式标准化...")
    standardized = cleaner.format_standardization(test_data)
    logger.info(f"标准化后数据: {len(standardized)} 条")
    logger.info(f"第一条数据格式: {standardized[0].keys()}")
    
    # 测试文本清理
    logger.info("\n测试文本清理...")
    test_text = "这是一段<script>alert('xss')</script>测试文本,包含 多个  空格。"
    cleaned_text = cleaner.clean_dialog_text(test_text)
    logger.info(f"清理前: {test_text}")
    logger.info(f"清理后: {cleaned_text}")
    
    # 测试数据增强
    logger.info("\n测试数据增强...")
    augmentor = DataAugmentor()
    test_item = {
        "id": "test_1",
        "dialog": [
            {"role": "user", "content": "请问这个产品怎么使用?"},
            {"role": "assistant", "content": "请参考产品说明书。"}
        ],
        "summary": "用户询问产品使用方法,客服建议参考说明书。"
    }
    
    augmented = augmentor.augment_data([test_item], samples_per_item=1)
    logger.info(f"增强后数据: {len(augmented)} 条")
    logger.info(f"原始摘要: {test_item['summary']}")
    if len(augmented) > 1:
        logger.info(f"增强摘要: {augmented[1]['summary']}")
    
    # 测试指令格式化
    logger.info("\n测试指令格式化...")
    formatter = InstructionFormatter()
    formatted = formatter.convert_to_instruction_format(augmented, use_multiple_instructions=False)
    logger.info(f"格式化后数据: {len(formatted)} 条")
    if formatted:
        logger.info(f"第一条指令: {formatted[0]['instruction']}")
        logger.info(f"输入预览: {formatted[0]['input'][:50]}...")
        logger.info(f"输出预览: {formatted[0]['output'][:50]}...")
    
    logger.info("\n组件测试完成!")
    return True

if __name__ == "__main__":
    # 运行测试
    test_pipeline()
    
    # 运行完整流程
    results = main()
    
    # 打印结果摘要
    print("\n" + "="*60)
    print("数据处理流程结果摘要")
    print("="*60)
    print(f"原始数据文件: {results['raw_data']}")
    print(f"清洗后数据: {results['cleaned_data']}")
    print(f"增强后数据: {results['augmented_data']}")
    print(f"指令格式数据: {results['instruction_data']}")
    print(f"最终微调数据(JSONL): {results['jsonl_data']}")
    print(f"数据质量报告: {results['quality_report']}")
    print(f"总训练样本数: {results['total_samples']}")
    print("="*60)

7.8 使用说明

(1)安装依赖

首先,确保安装了必要的Python库:

pip install pandas numpy langdetect tqdm

(2)运行完整流程

直接运行上述代码,它会:

  1. 创建示例数据(如果没有原始数据文件)
  2. 执行完整的数据处理流程:
    • 数据清洗(格式标准化、噪声过滤、去重)
    • 数据增强(同义词替换、回译模拟)
    • 指令格式转换
    • 质量检查和报告生成
  3. 保存处理后的数据为JSONL格式,可直接用于大模型微调

(3)使用自己的数据

要使用自己的数据,只需:

  1. 准备一个JSON格式的原始数据文件,结构如下:
[
  {
    "dialog": [
      {"role": "user", "content": "用户问题"},
      {"role": "assistant", "content": "客服回答"}
    ],
    "summary": "对话摘要"
  }
]
  1. 将文件命名为raw_customer_service_data.json,或修改代码中的文件名。

  2. 运行代码,处理后的数据将保存为JSONL格式,适合用于大模型微调。

(4)自定义处理步骤

可以根据需要调整以下参数:

  • 数据清洗:在DataCleaner类中调整参数

    • min_dialog_turns:最小对话轮数
    • max_dialog_turns:最大对话轮数
    • min_summary_words:摘要最小词数
  • 数据增强:在DataAugmentor类中调整

    • augmentation_methods:启用的增强方法
    • samples_per_item:每个样本生成的增强样本数
  • 指令格式:在InstructionFormatter类中调整instruction_templates

(5)输出文件说明

代码会生成以下文件:

  1. raw_customer_service_data.json - 原始数据(如果代码自动创建)
  2. cleaned_customer_service_data.json - 清洗后的数据
  3. augmented_customer_service_data.json - 增强后的数据
  4. instruction_formatted_data.json - 指令格式数据
  5. customer_service_sft_data.jsonl - 最终微调数据(JSONL格式)
  6. data_quality_report.json - 数据质量报告

(6)扩展功能

代码设计为模块化,可以轻松扩展:

  • 添加新的数据清洗规则
  • 实现更复杂的数据增强方法
  • 集成更高级的去重算法(如MinHash、SimHash)
  • 添加基于模型的增强方法
  • 实现合成数据生成

这个完整的代码实现展示了从原始日志到微调数据集的完整工业级流程,可以直接运行和测试,并可根据实际需求进行定制和扩展。


🌟 感谢您耐心阅读到这里!
💡 如果本文对您有所启发欢迎:
👍 点赞📌 收藏 📤 分享给更多需要的伙伴。
🗣️ 期待在评论区看到您的想法, 共同进步。
🔔 关注我,持续获取更多干货内容~
🤗 我们下篇文章见~

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐