本文来源于数据从业者全栈知识库,更多体系化内容请访问知识库。
目录
- #一、合成数据的必要性
- #二、三类合成数据的生成方法
- #三、数据飞轮机制
- #四、主动学习:聪明地花标注预算
- #五、合成数据的质量陷阱
- #六、实战案例:企业知识问答系统的数据飞轮
一、合成数据的必要性
1.1 真实数据的三大困境
做过数据工程的人都知道,数据的问题从来不是”没有数据”,而是”有用的数据要么贵,要么违规,要么根本不存在”。
困境一:标注成本高
| 标注类型 | 单样本成本 | 万样本预算 |
|---|---|---|
| 文本分类(众包) | 0.3 | 3K |
| 命名实体识别 | 2 | 20K |
| 对话意图标注 | 5 | 50K |
| 专业领域问答对 | 15 | 150K |
| 医疗/法律专业标注 | 100 | $200K+ |
医疗领域的标注需要执业医师,一个问答对的成本可能高达 $50。对初创公司来说,这个数字直接把”自研专业领域模型”这条路堵死了。
困境二:隐私合规
GDPR 在欧洲,数据本地化要求在中国,这两个合规要求把大量用户数据锁在了”不可出境”的盒子里。你的模型想学习用户行为?先过法务这关。
更隐蔽的问题:即使数据合规可用,里面包含的 PII(个人可识别信息)也让数据共享变得极其麻烦。合成数据天然不含真实用户信息,这是它最重要的合规价值。
困境三:长尾场景数据稀少
真实世界的数据分布是长尾的。银行风控系统里,欺诈交易可能只占 0.1%;医疗诊断里,罕见病案例可能一年只有几十个。用真实数据训练,模型永远学不好长尾场景——因为样本根本不够。
1.2 合成数据的质量演进(2020-2025)
timeline
title 合成数据质量演进时间线
2020 : GAN生成图像
: 合成图像用于CV任务
: 效果普遍比真实数据差5-15%
2021 : 文本数据增强
: 回译、同义词替换
: 效果有限,多样性不足
2022 : GPT-3 自我指令
: Self-Instruct 论文发布
: 指令数据合成成为可能
2023 : Stanford Alpaca
: 52K合成指令数据微调LLaMA
: 合成数据首次追平真实数据
: WizardLM Evol-Instruct
: 合成数据开始超越真实数据
2024 : 数据合成规模化
: Phi系列:高质量合成数据训练小模型
: 合成数据超越真实数据成主流
2025 : 合成数据成标配
: 主流大模型训练均包含合成数据
: 数据飞轮闭环工程化
Stanford Alpaca 的意义不只是”合成数据有用”,而是证明了一件事:用强模型(GPT-3.5)生成的数据,可以有效训练弱模型(LLaMA-7B)。这开了一个口子,从此合成数据在 NLP 领域就再也没有被关上。
1.3 什么时候用合成数据
一个简洁的判断标准:
当标注成本 > 合成成本 × (1/质量折扣系数) 时,用合成数据
质量折扣系数(0 到 1 之间)反映合成数据与真实数据的质量差距。如果合成数据质量是真实数据的 80%(折扣系数 0.8),那么合成成本只要低于标注成本的 80%,就值得用合成数据。
实践中,GPT-4o API 生成一条高质量问答对的成本约为 0.05,比人工标注便宜 1-2 个数量级。多数场景下,这个公式的结论是显而易见的。
二、三类合成数据的生成方法
2.1 类型一:指令微调数据(SFT Data)
SFT(Supervised Fine-Tuning)数据的核心是”指令-回答对”:给模型一个任务描述,告诉它应该怎么回答。
Self-Instruct 方法:让 LLM 自己出题
Self-Instruct 的思路很朴素:先手写 175 条”种子指令”,然后让 LLM 用这些种子生成更多指令,再用 LLM 回答这些指令,最后过滤掉低质量的。Stanford Alpaca 就是这个思路的经典实现。
Evol-Instruct(WizardLM):让指令”进化”
Evol-Instruct 的核心操作是把简单指令改造成复杂指令。进化操作包括:
- 深度进化:要求更详细的解释、增加约束条件、增加推理步骤
- 广度进化:生成全新的、相关但不同的指令
- 具体化:把抽象任务变成具体场景
一个例子:
- 原始指令:「写一个 Python 函数,计算两个数的和」
- 深度进化:「写一个 Python 函数,计算两个数的和,要求:1)处理输入不是数字的情况,2)支持复数,3)用类型注解,4)写单元测试」
完整代码:批量生成数据工程领域 SFT 数据
import openaiimport jsonimport hashlibfrom typing import Optionalfrom datasketch import MinHash, MinHashLSH
client = openai.OpenAI()
# 数据工程领域的种子指令(手写 20-30 条)SEED_INSTRUCTIONS = [ "解释 Apache Spark 中 RDD、DataFrame 和 Dataset 的区别", "用 PySpark 写一个读取 Parquet 文件并做聚合统计的示例", "设计一个处理每日 10 亿条日志的数据管道架构", "解释 Kafka 中 Partition、Offset 和 Consumer Group 的关系", "什么是数据湖、数据仓库、数据湖仓一体(Lakehouse)?各自适用场景?", "用 Flink 实现一个实时计算用户活跃度的窗口任务", "解释 Hive 分区表和分桶表的区别及使用场景", "如何优化一个慢查询的 Spark Job?",]
def generate_instructions(seed_instructions: list[str], n: int = 10) -> list[str]: """用 LLM 基于种子指令生成新指令""" seed_sample = "\n".join([f"- {inst}" for inst in seed_instructions[:8]])
prompt = f"""你是一位资深数据工程师。请参考以下示例指令的风格和难度,生成 {n} 条全新的、不重复的数据工程领域指令。
示例指令:{seed_sample}
要求:1. 涵盖 Spark、Flink、Kafka、Hive、数据仓库、数据湖等主题2. 难度适中到高级3. 每条指令具体、可操作4. 直接输出指令列表,每行一条,不加编号
生成 {n} 条指令:"""
response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], temperature=0.9, # 提高温度增加多样性 )
instructions = response.choices[0].message.content.strip().split("\n") return [inst.strip() for inst in instructions if inst.strip()]
def generate_answer(instruction: str) -> Optional[str]: """为指令生成高质量答案""" prompt = f"""你是一位有 10 年经验的数据工程师,请回答以下问题。要求:1. 回答准确、专业2. 包含具体的代码示例(如果适用)3. 说明优缺点和适用场景4. 控制在 300-800 字
问题:{instruction}"""
try: response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], temperature=0.3, # 降低温度保证准确性 ) return response.choices[0].message.content.strip() except Exception as e: print(f"生成答案失败: {e}") return None
def score_quality(instruction: str, answer: str) -> float: """用 LLM 对问答对打质量分(0-10)""" prompt = f"""请评估以下问答对的质量,返回 0-10 的分数(只返回数字)。
评估标准:- 指令清晰度(2分):指令是否清晰、具体、可操作- 答案准确性(3分):技术内容是否正确- 答案完整性(3分):是否覆盖了问题的主要方面- 实用性(2分):是否包含代码或具体示例
问题:{instruction}答案:{answer[:500]}...
质量分(0-10):"""
response = client.chat.completions.create( model="gpt-4o-mini", # 用小模型评分,省成本 messages=[{"role": "user", "content": prompt}], temperature=0, )
try: return float(response.choices[0].message.content.strip()) except: return 5.0
def deduplicate_minhash(instructions: list[str], threshold: float = 0.8) -> list[str]: """使用 MinHash LSH 去除相似指令""" lsh = MinHashLSH(threshold=threshold, num_perm=128) unique_instructions = []
for i, inst in enumerate(instructions): minhash = MinHash(num_perm=128) for word in inst.split(): minhash.update(word.encode("utf-8"))
if not lsh.query(minhash): lsh.insert(str(i), minhash) unique_instructions.append(inst)
return unique_instructions
def build_sft_dataset( n_instructions: int = 500, quality_threshold: float = 7.0, output_file: str = "data_engineering_sft.jsonl") -> list[dict]: """完整的 SFT 数据生成流程"""
# Step 1: 生成指令 print("Step 1: 生成指令...") all_instructions = list(SEED_INSTRUCTIONS)
while len(all_instructions) < n_instructions * 2: # 生成 2 倍数量,过滤后取目标数量 new_instructions = generate_instructions(all_instructions, n=20) all_instructions.extend(new_instructions) print(f" 当前指令数: {len(all_instructions)}")
# Step 2: 去重 print("Step 2: MinHash 去重...") unique_instructions = deduplicate_minhash(all_instructions) print(f" 去重后: {len(unique_instructions)} 条")
# Step 3: 生成答案 + 质量过滤 print("Step 3: 生成答案并过滤...") dataset = []
for i, instruction in enumerate(unique_instructions[:n_instructions * 2]): answer = generate_answer(instruction) if answer is None: continue
score = score_quality(instruction, answer)
if score >= quality_threshold: dataset.append({ "instruction": instruction, "output": answer, "quality_score": score, }) print(f" [{i+1}] 得分 {score:.1f} ") else: print(f" [{i+1}] 得分 {score:.1f} 过滤")
if len(dataset) >= n_instructions: break
# Step 4: 保存 with open(output_file, "w", encoding="utf-8") as f: for item in dataset: f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"\n最终数据集: {len(dataset)} 条,保存至 {output_file}") return dataset
if __name__ == "__main__": dataset = build_sft_dataset(n_instructions=500, quality_threshold=7.0)2.2 类型二:RAG 训练数据
RAG 系统需要两种数据:训练数据(微调 Embedding 模型)和评估数据(衡量检索质量)。从文档自动生成问答对,是最高效的方案。
关于 RAG 工程的完整实现,参见 RAG检索增强生成实战。
三步生成流程
文档切片 → LLM 生成问题 → 验证答案可从文档中找到完整代码:批量生成 RAG 问答对
import openaiimport jsonfrom typing import Optionalfrom langchain.text_splitter import RecursiveCharacterTextSplitter
client = openai.OpenAI()
def chunk_document(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]: """文档切片""" splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=overlap, separators=["\n\n", "\n", "。", "!", "?", " "], ) return splitter.split_text(text)
def generate_questions_from_chunk(chunk: str, n_questions: int = 3) -> list[str]: """从文档片段生成问题""" prompt = f"""基于以下文档片段,生成 {n_questions} 个有价值的问题。
要求:1. 问题必须可以从文档中找到答案2. 问题要具体,不要泛泛而问3. 问题难度适中,有实际意义4. 每行一个问题,不加编号
文档片段:{chunk}
生成的问题:"""
response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.7, )
questions = response.choices[0].message.content.strip().split("\n") return [q.strip() for q in questions if q.strip()]
def verify_and_generate_answer(question: str, chunk: str) -> Optional[dict]: """验证问题可回答,并生成参考答案""" prompt = f"""请基于以下文档片段回答问题。如果文档中没有足够信息回答,请回复"UNANSWERABLE"。
文档:{chunk}
问题:{question}
答案:"""
response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], temperature=0.1, )
answer = response.choices[0].message.content.strip()
if "UNANSWERABLE" in answer.upper(): return None
return { "question": question, "answer": answer, "context": chunk, # RAGAS 兼容格式 "ground_truth": answer, "contexts": [chunk], }
def build_rag_dataset( documents: list[str], questions_per_chunk: int = 2, output_file: str = "rag_eval_dataset.jsonl") -> list[dict]: """ 从文档列表构建 RAG 评估数据集 输出格式与 RAGAS 框架兼容,可直接用于评估 """ dataset = []
for doc_idx, doc in enumerate(documents): chunks = chunk_document(doc) print(f"文档 {doc_idx + 1}: {len(chunks)} 个片段")
for chunk_idx, chunk in enumerate(chunks): if len(chunk) < 100: # 过滤太短的片段 continue
questions = generate_questions_from_chunk(chunk, n_questions=questions_per_chunk)
for question in questions: qa_pair = verify_and_generate_answer(question, chunk) if qa_pair: qa_pair["doc_id"] = doc_idx qa_pair["chunk_id"] = chunk_idx dataset.append(qa_pair) print(f" 问答对 {len(dataset)}: {question[:50]}...")
# 保存为 RAGAS 兼容格式 with open(output_file, "w", encoding="utf-8") as f: for item in dataset: f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"\n生成 {len(dataset)} 个问答对,保存至 {output_file}") return dataset
# 使用示例if __name__ == "__main__": # 加载企业文档(以 Spark 官方文档为例) with open("spark_documentation.txt", "r") as f: documents = [f.read()]
dataset = build_rag_dataset( documents=documents, questions_per_chunk=2, output_file="spark_rag_eval.jsonl" )
# 直接用 RAGAS 评估 # from ragas import evaluate # from datasets import Dataset # eval_dataset = Dataset.from_list(dataset) # results = evaluate(eval_dataset)2.3 类型三:Embedding 训练数据
微调 Embedding 模型需要两类数据:正例对(语义相似的文本对)和难负例(语义相近但实际不相关)。这是 Embedding工程实践 中模型微调部分的数据来源。
import openaiimport randomfrom sentence_transformers import SentenceTransformer, utilimport torch
client = openai.OpenAI()
def generate_positive_pairs(chunk: str, n: int = 2) -> list[tuple[str, str]]: """从同一文档片段生成语义相似的文本对""" prompt = f"""基于以下文档,生成 {n} 个语义相同但表达不同的句子对。
原文档:{chunk[:300]}
要求:改写成不同的表达方式,保持语义一致。每行一对,用 ||| 分隔。
示例格式:Spark 是分布式计算框架 ||| Apache Spark 是用于大规模数据处理的分布式系统
生成的句子对:"""
response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.7, )
pairs = [] for line in response.choices[0].message.content.strip().split("\n"): if "|||" in line: parts = line.split("|||") if len(parts) == 2: pairs.append((parts[0].strip(), parts[1].strip()))
return pairs
def mine_hard_negatives( query: str, candidate_texts: list[str], model_name: str = "BAAI/bge-m3", top_k: int = 10, hard_negative_range: tuple = (2, 6), # 取第2-6名作为难负例) -> list[str]: """ 难负例挖掘:找语义相近但实际不相关的样本
hard_negative_range: 跳过第1名(真正相关),取2-6名作为难负例 """ model = SentenceTransformer(model_name)
query_emb = model.encode(query, convert_to_tensor=True) cand_embs = model.encode(candidate_texts, convert_to_tensor=True)
scores = util.cos_sim(query_emb, cand_embs)[0] top_indices = torch.argsort(scores, descending=True)
start, end = hard_negative_range hard_negatives = [ candidate_texts[idx] for idx in top_indices[start:end] ]
return hard_negatives
def build_embedding_training_data( chunks: list[str], output_file: str = "embedding_train.jsonl") -> list[dict]: """ 构建 Embedding 微调数据集 格式:{"query": ..., "positive": ..., "negative": ...} """ dataset = []
for i, chunk in enumerate(chunks): # 生成正例对 pos_pairs = generate_positive_pairs(chunk, n=2)
for query, positive in pos_pairs: # 挖掘难负例(从其他 chunk 中找) other_chunks = [c for j, c in enumerate(chunks) if j != i] if len(other_chunks) > 5: sample_chunks = random.sample(other_chunks, min(50, len(other_chunks))) hard_negs = mine_hard_negatives(query, sample_chunks)
for neg in hard_negs[:2]: # 每个正例配 2 个难负例 dataset.append({ "query": query, "positive": positive, "negative": neg, })
with open(output_file, "w", encoding="utf-8") as f: for item in dataset: f.write(json.dumps(item, ensure_ascii=False) + "\n")
return dataset三、数据飞轮机制
3.1 什么是数据飞轮
经典定义来自亚马逊:用户使用产品 → 产生数据 → 改进推荐算法 → 产品体验更好 → 吸引更多用户。这是一个正向循环,一旦转动起来,竞争对手很难从外部打断它。
AI 时代的数据飞轮有一个关键强化:用户反馈(点赞、点踩、直接修改答案)可以直接作为训练信号,不需要额外的人工标注环节。每一个”这个回答不好,我改成这样”的操作,都是一条 DPO(Direct Preference Optimization)训练数据。
flowchart LR
A["用户使用产品<br>提问、对话、搜索"] --> B["收集交互数据<br>问答记录、点赞点踩"]
B --> C["数据处理<br>清洗、去重、过滤"]
C --> D["模型训练<br>SFT / RLHF / DPO"]
D --> E["更好的模型<br>准确率更高、回答更好"]
E --> F["产品体验提升<br>用户粘性增加"]
F --> G["更多用户、更多数据<br>DAU增长"]
G --> A
style A fill:#4A90D9,color:#fff
style D fill:#E67E22,color:#fff
style E fill:#27AE60,color:#fff
style G fill:#8E44AD,color:#fff
3.2 数据飞轮的四个阶段
阶段一:冷启动(合成数据驱动)
没有用户,没有历史数据,只有一堆文档和 API Key。这时候的目标是用合成数据训练出一个”够用”的初始模型,让产品能够上线。
- 用企业文档生成 RAG 评估集(见 2.2 节代码)
- 用 Self-Instruct 生成垂直领域 SFT 数据(见 2.1 节代码)
- 基于开源 Embedding 模型微调,让检索更贴合领域术语
- 目标:不追求完美,追求”能用”
阶段二:探索期(数据收集)
产品上线,开始有真实用户交互。这个阶段的核心任务是建立数据收集基础设施,不是优化模型。
- 每个问答记录完整存储(问题、召回的文档块、生成的答案、时间戳)
- 用户反馈收集(至少:点赞/点踩;理想:允许用户编辑答案)
- 无反馈的会话按会话时长和深度推断隐式满意度
- 关键指标:日活跃用户数 × 平均交互次数 = 数据收集速度
阶段三:精调期(模型迭代)
积累了足够的真实数据(通常 500-2000 条高质量标注)后,开始第一次真正的模型迭代。
- 优先处理低评分记录(用主动学习策略,见第四节)
- 用人工修正的记录做 DPO/RLHF 训练
- A/B 测试新旧模型,用数据说话
- 目标:每次迭代能看到可测量的提升
阶段四:飞轮转动(自我强化)
模型效果提升 → 用户满意度提升 → 用户更愿意使用和反馈 → 数据质量提升 → 再次迭代。进入这个阶段后,竞争对手的最大障碍不是技术,而是数据积累的时间差。
3.3 数据飞轮的关键指标
| 指标 | 定义 | 健康范围 | 预警信号 |
|---|---|---|---|
| 数据收集速度 | DAU × 平均交互次数/天 | 与业务规模相关 | 增长停滞 |
| 有效反馈率 | 有明确反馈的会话比例 | > 15% | < 5% |
| 标注效率 | 每小时完成有效标注数 | > 30条/人/时 | < 10条 |
| 模型迭代周期 | 两次生产模型更新的间隔 | 1-4 周 | > 3 个月 |
| 飞轮增益 | 每次迭代后核心指标的提升 | > 2% | < 0.5% |
四、主动学习:聪明地花标注预算
4.1 核心思想
随机标注是最低效的标注策略。主动学习的思路是:优先标注模型最不确定的样本。
直觉上很好理解:模型对于已经”学会了”的简单样本,再标注 100 条也没什么提升;但对于模型”拿不准”的困难样本,标注 10 条就可能显著改变决策边界。
主动学习可以用更少的标注预算达到同等甚至更好的模型效果——实验表明,主动学习通常可以用 20%-40% 的标注量达到随机标注 100% 时的性能。
4.2 不确定性采样策略
Entropy 采样:信息熵最大的样本
import numpy as npfrom openai import OpenAI
client = OpenAI()
def get_token_probs(text: str, model: str = "gpt-4o-mini") -> dict: """ 获取模型对某个问题的输出概率分布 使用 logprobs 参数获取 token 级别的概率 """ response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "请将以下文本分类为:正面/负面/中性。只输出分类标签。"}, {"role": "user", "content": text}, ], logprobs=True, top_logprobs=5, max_tokens=5, )
# 提取第一个 token 的概率分布 top_logprobs = response.choices[0].logprobs.content[0].top_logprobs
probs = {} for lp in top_logprobs: probs[lp.token] = np.exp(lp.logprob) # logprob -> prob
return probs
def entropy_score(probs: dict) -> float: """计算信息熵。熵越高,模型越不确定""" values = np.array(list(probs.values())) values = values / values.sum() # 归一化 return float(-np.sum(values * np.log(values + 1e-10)))
def margin_score(probs: dict) -> float: """计算 Margin。最高概率与次高概率之差越小,模型越不确定""" sorted_probs = sorted(probs.values(), reverse=True) if len(sorted_probs) < 2: return 0.0 return sorted_probs[0] - sorted_probs[1]
def active_learning_selection( unlabeled_samples: list[str], budget: int, strategy: str = "entropy" # "entropy" 或 "margin") -> list[tuple[str, float]]: """ 主动学习:从未标注数据中选出最值得标注的 budget 条
返回:(样本文本, 不确定性分数) 的列表,按分数降序排列 """ scored_samples = []
for sample in unlabeled_samples: probs = get_token_probs(sample)
if strategy == "entropy": score = entropy_score(probs) else: # margin(分数越低越不确定) score = -margin_score(probs) # 取负,统一为越高越不确定
scored_samples.append((sample, score, probs))
# 按不确定性排序,取前 budget 条 scored_samples.sort(key=lambda x: x[1], reverse=True) selected = [(sample, score) for sample, score, _ in scored_samples[:budget]]
print(f"从 {len(unlabeled_samples)} 条未标注数据中选出 {budget} 条高价值样本") print(f"平均不确定性分数: {np.mean([s for _, s in selected]):.3f}")
return selected
# 使用示例if __name__ == "__main__": # 假设有 1000 条用户问题待标注 unlabeled = [ "这个数据产品用起来还行", "Spark 的 shuffle 为什么这么慢", "还可以吧", # ... 更多样本 ]
# 只有 50 条的标注预算,优先选最有价值的 selected = active_learning_selection( unlabeled_samples=unlabeled, budget=50, strategy="entropy" )
print("\n最应该标注的前5条:") for text, score in selected[:5]: print(f" [{score:.3f}] {text}")4.3 在数据飞轮中的实际应用
主动学习在数据飞轮的精调期(阶段三)最有价值。具体操作:
- 每天积累的低评分会话中,用 Entropy 采样选出最不确定的 20 条
- 将这 20 条发给人工标注员(而不是随机选 20 条)
- 用新标注数据更新模型
- 循环
这个策略在实践中的效果是:用同样的标注预算,模型收敛速度快 2-3 倍。
五、合成数据的质量陷阱
5.1 三大陷阱
陷阱一:模式坍塌(Pattern Collapse)
LLM 生成的数据存在系统性偏见。用 GPT-4 生成 1000 条”优质回答示例”,你会发现它们惊人地相似:都喜欢用”首先…其次…最后”的结构,都倾向于给出”三点建议”,都有类似的措辞习惯。
这不是 GPT-4 的问题,而是所有 LLM 的共性:它们对同类型的输出有固定偏好。用这样的数据训练出来的模型,会继承这种模式坍塌。
陷阱二:幻觉传播(Hallucination Propagation)
LLM 会在合成数据里产生幻觉。如果你用合成数据训练模型,模型不仅学会了你想教的内容,也学会了 LLM 的幻觉模式。这比没有幻觉的模型更危险,因为它的幻觉更自信。
GPT-4 在 2024 年的幻觉率大约是 3-8%(取决于领域)。合成 1000 条数据,可能有 30-80 条包含错误信息。这些错误会被模型当成”真理”学进去。
陷阱三:领域偏移(Domain Shift)
合成数据和真实用户数据的分布几乎不可能完全一致。你用”数据工程师会怎么问问题”来生成数据,但真实用户可能是”刚接触大数据的产品经理”。两者的问法、用词、知识背景差异很大。
5.2 缓解方法
| 陷阱 | 主要缓解方法 | 实施成本 |
|---|---|---|
| 模式坍塌 | 提高生成温度(0.7-1.0);使用多个不同模型生成 | 低 |
| 模式坍塌 | 多样性采样(最大化生成数据与种子数据的编辑距离) | 中 |
| 幻觉传播 | 人工抽检(5-10%比例);用检索验证事实声明 | 高 |
| 幻觉传播 | 用多个模型交叉验证(A模型生成,B模型审核) | 中 |
| 领域偏移 | 真实数据混合(建议合成:真实 = 3:1 到 1:1) | 低 |
| 领域偏移 | 收集真实用户问题作为种子指令 | 低 |
六、实战案例:企业知识问答系统的数据飞轮
6.1 场景描述
一家制造企业想做内部知识问答系统,让员工可以直接问”某型号设备的维护周期是多久”之类的问题。初始资产:500 份技术手册(PDF)、没有任何历史问答数据。
6.2 完整飞轮方案
Step 1:用企业文档生成初始评估数据集
# 使用 2.2 节的 build_rag_dataset 函数documents = load_pdf_documents("technical_manuals/") # 500 份手册
# 目标:生成 500 个有代表性的问答对eval_dataset = build_rag_dataset( documents=documents, questions_per_chunk=1, # 每个片段1个问题,避免重复 output_file="equipment_qa_eval.jsonl")Step 2:部署初版系统,建立数据收集基础设施
from datetime import datetimeimport sqlite3
class InteractionLogger: """记录用户交互,为数据飞轮提供燃料"""
def __init__(self, db_path: str = "interactions.db"): self.conn = sqlite3.connect(db_path) self._init_db()
def _init_db(self): self.conn.execute(""" CREATE TABLE IF NOT EXISTS interactions ( id INTEGER PRIMARY KEY, session_id TEXT, question TEXT, retrieved_chunks TEXT, -- JSON 数组 answer TEXT, user_feedback INTEGER, -- 1=满意, -1=不满意, 0=无反馈 user_correction TEXT, -- 用户修改后的答案(如果有) timestamp TEXT, latency_ms INTEGER ) """) self.conn.commit()
def log( self, session_id: str, question: str, retrieved_chunks: list[str], answer: str, latency_ms: int, ) -> int: """记录一次问答交互""" import json cursor = self.conn.execute( """INSERT INTO interactions (session_id, question, retrieved_chunks, answer, user_feedback, timestamp, latency_ms) VALUES (?, ?, ?, ?, 0, ?, ?)""", (session_id, question, json.dumps(retrieved_chunks, ensure_ascii=False), answer, datetime.now().isoformat(), latency_ms) ) self.conn.commit() return cursor.lastrowid
def update_feedback(self, interaction_id: int, feedback: int, correction: str = None): """更新用户反馈(异步)""" self.conn.execute( "UPDATE interactions SET user_feedback=?, user_correction=? WHERE id=?", (feedback, correction, interaction_id) ) self.conn.commit()
def get_low_quality_interactions(self, limit: int = 100) -> list[dict]: """获取低质量交互(用于主动学习选择)""" cursor = self.conn.execute( """SELECT * FROM interactions WHERE user_feedback = -1 OR (user_feedback = 0 AND julianday('now') - julianday(timestamp) > 1) ORDER BY timestamp DESC LIMIT ?""", (limit,) ) columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in cursor.fetchall()]Step 3:主动学习选择标注样本
def select_samples_for_annotation(logger: InteractionLogger, budget: int = 20) -> list[dict]: """ 从低质量交互中,用主动学习策略选出最值得人工标注的样本 """ low_quality = logger.get_low_quality_interactions(limit=200)
# 用不确定性评分排序 questions = [item["question"] for item in low_quality] selected_with_scores = active_learning_selection(questions, budget=budget)
# 对应回原始记录 selected_questions = {q for q, _ in selected_with_scores} selected_items = [item for item in low_quality if item["question"] in selected_questions]
return selected_items[:budget]Step 4:用修正数据微调 Embedding 模型
from sentence_transformers import SentenceTransformer, InputExample, lossesfrom torch.utils.data import DataLoader
def finetune_embedding_with_corrections( corrections: list[dict], # {"question": ..., "correct_answer": ..., "retrieved_chunks": ...} base_model: str = "BAAI/bge-m3", output_path: str = "finetuned_embedding",): """ 用人工修正数据微调 Embedding 模型
核心思路:正确答案所在的 chunk 应该与问题更相似(正例) 被错误检索的 chunk 应该与问题不那么相似(负例) """ model = SentenceTransformer(base_model)
train_examples = [] for item in corrections: question = item["question"] correct_chunk = item.get("correct_chunk", item["correct_answer"])
# 从错误检索的 chunks 中选难负例 wrong_chunks = item.get("retrieved_chunks", [])
if wrong_chunks: # MultipleNegativesRankingLoss 格式 example = InputExample(texts=[question, correct_chunk] + wrong_chunks[:3]) train_examples.append(example)
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16) train_loss = losses.MultipleNegativesRankingLoss(model)
model.fit( train_objectives=[(train_dataloader, train_loss)], epochs=3, warmup_steps=100, output_path=output_path, )
print(f"微调完成,模型保存至 {output_path}") return modelStep 5:评估效果,决定是否发布新模型
def evaluate_rag_improvement( eval_dataset_path: str, old_embedding_model: str, new_embedding_model: str,) -> dict: """ 对比新旧 Embedding 模型在 RAG 评估集上的表现 结合 33-LLM评估体系 中的评估框架 """ from ragas import evaluate from ragas.metrics import faithfulness, answer_relevancy, context_recall from datasets import Dataset
with open(eval_dataset_path) as f: eval_data = [json.loads(line) for line in f]
results = {} for model_name, is_new in [(old_embedding_model, False), (new_embedding_model, True)]: # 用对应 Embedding 模型重新检索 dataset = Dataset.from_list(eval_data) scores = evaluate( dataset=dataset, metrics=[faithfulness, answer_relevancy, context_recall], ) results["new" if is_new else "old"] = scores
improvement = { metric: results["new"][metric] - results["old"][metric] for metric in results["new"] }
print("\n评估结果对比:") for metric, diff in improvement.items(): direction = "+" if diff > 0 else "" print(f" {metric}: {direction}{diff:.3f}")
return improvement6.3 飞轮节奏建议
| 时间节点 | 行动 | 目标 |
|---|---|---|
| 第0周 | 生成合成评估集,部署初版系统 | 上线,开始收集数据 |
| 第2周 | 收集到 200+ 条交互,首次人工标注 50 条 | 建立标注基准 |
| 第4周 | 用 50 条标注数据微调 Embedding,A/B 测试 | 验证飞轮可行性 |
| 第8周 | 积累 200+ 条标注,第二次微调 + DPO | 模型质量可见提升 |
| 第12周 | 自动化标注流程,缩短迭代周期 | 飞轮开始加速 |
总结
合成数据不是数据质量问题的终极解法,但它是起步阶段最现实的选择。
判断一个团队对合成数据的理解是否到位,可以问三个问题:
- 你们的合成数据有多样性控制吗?(抵抗模式坍塌)
- 你们有真实数据混合策略吗?(抵抗领域偏移)
- 你们的数据飞轮什么时候开始收集第一条真实反馈?(冷启动意识)
能回答这三个问题的团队,基本上不会在”合成数据有没有用”这个问题上浪费时间——他们已经在”怎么用好”的路上了。
#合成数据 #数据飞轮 #主动学习 #LLM #RAG训练 #数据工程 #SFT #Embedding微调
本文节选自数据从业者全栈知识库。知识库包含 2300+ 篇体系化技术文档,覆盖数据分析、数据工程、数据治理、AI 等全栈领域。了解更多 ->