句法结构角度的句子复杂度分析器(基于哈工大LTP)
借助哈工大LTP工具对句子进行句法依存分析,最后导出自动分析报告至语料库txt文件所在文件夹。

from ltp import LTP
from tabulate import tabulate
from collections import defaultdict
import time
import os
import glob
import pandas as pd
from datetime import datetime
class BatchSentenceAnalyzer:
"""批量句子分析器"""
def __init__(self, ltp_model_path=None, config=None):
"""初始化分析器"""
self.ltp = LTP() if not ltp_model_path else LTP(ltp_model_path)
self.config = config or {
'punctuation_relations': {"WP", "MP"},
'nested_detection_mode': "direct",
'special_nested_patterns': [("ATT", "ATT"), ("VOB", "ATT"), ("SBV", "ATT"), ("COO", "VOB")],
'include_table_in_report': True,
'summary_only': False,
}
self.output_dir = None
print("分析器初始化完成,输出目录将在处理时确定。")
def analyze_sentence(self, sentence, sentence_id=None):
"""分析单个句子"""
start_time = time.time()
# 执行分析
result = self.ltp.pipeline([sentence], tasks=["cws", "dep"])
seg = result.cws[0]
dep = result.dep[0]
heads = dep["head"]
labels = dep["label"]
analysis_time = time.time() - start_time
# 构建快速查找的数据结构
children_dict = {i: [] for i in range(len(seg) + 1)}
for child_idx, parent_idx in enumerate(heads, start=1):
children_dict[parent_idx].append(child_idx)
# 辅助函数
def is_punctuation(relation_label):
return relation_label in self.config['punctuation_relations']
def is_nested_direct(node_index, relation_label):
if is_punctuation(relation_label):
return "标点"
for child in children_dict.get(node_index, []):
if not is_punctuation(labels[child - 1]):
return "是"
return "否"
def is_nested_recursive(node_index, relation_label):
if is_punctuation(relation_label):
return "标点"
stack = children_dict.get(node_index, [].copy())
while stack:
current_child = stack.pop()
if not is_punctuation(labels[current_child - 1]):
return "是"
stack.extend(children_dict.get(current_child, []))
return "否"
# 选择检测模式
if self.config['nested_detection_mode'] == "direct":
is_nested = is_nested_direct
def check_special_nested(node_index, relation_label, pattern):
if is_punctuation(relation_label):
return False
parent_rel = relation_label
target_child_rel = pattern[1]
for child in children_dict.get(node_index, []):
if labels[child - 1] == target_child_rel and not is_punctuation(labels[child - 1]):
if self.config['nested_detection_mode'] == "recursive":
stack = [(child, [child])]
while stack:
current, path = stack.pop()
if labels[current - 1] == pattern[0] and not is_punctuation(labels[current - 1]) and len(path) > 1:
return True
for grandchild in children_dict.get(current, []):
stack.append((grandchild, path + [grandchild]))
else:
return True
return False
# 构建表格数据
table_data = []
special_nested_info = defaultdict(list)
all_relations = set(labels)
for i, word in enumerate(seg):
word_idx = i + 1
head_idx = heads[i]
if head_idx == 0:
head_word = "ROOT"
dep_pos = "0"
distance = 0
else:
head_word = seg[head_idx - 1]
dep_pos = str(head_idx)
distance = abs(word_idx - head_idx)
current_relation = labels[i]
nested_status = is_nested(word_idx, current_relation)
special_tags = []
if not is_punctuation(current_relation):
for pattern in self.config['special_nested_patterns']:
if current_relation == pattern[0] and check_special_nested(word_idx, current_relation, pattern):
tag = f"{pattern[0]}→{pattern[1]}"
special_tags.append(tag)
special_nested_info[tag].append(f"{word}({word_idx})")
special_field = "、".join(special_tags) if special_tags else "/"
table_data.append([word, word_idx, head_word, dep_pos, current_relation, distance, nested_status, special_field])
# 分类统计
punct_nodes = [row for row in table_data if row[6] == "标点"]
nested_nodes = [row for row in table_data if row[6] == "是"]
non_nested_nodes = [row for row in table_data if row[6] == "否"]
non_punct_nodes = [row for row in table_data if row[6] != "标点"]
# 计算复杂度评分
if nested_nodes:
avg_dist_nested = sum(row[5] for row in nested_nodes) / len(nested_nodes)
complexity_score = len(nested_nodes) * avg_dist_nested
else:
complexity_score = 0
# 返回结果
stats = {
'sentence_id': sentence_id or 0,
'sentence': sentence,
'word_count': len(seg),
'analysis_time': analysis_time,
'total_nodes': len(table_data),
'punct_nodes': len(punct_nodes),
'non_punct_nodes': len(non_punct_nodes),
'nested_nodes': len(nested_nodes),
'nested_ratio': len(nested_nodes) / len(non_punct_nodes) if len(non_punct_nodes) > 0 else 0,
'complexity_score': complexity_score,
'special_patterns_found': sum(len(v) for v in special_nested_info.values()),
}
details = {
'table_data': table_data,
'special_nested_info': dict(special_nested_info),
'all_relations': list(all_relations),
'seg': seg,
'heads': heads,
'labels': labels,
}
return stats, details
def generate_report(self, stats, details, sentence_id=None):
"""生成详细报告"""
output_lines = []
def add_line(text):
output_lines.append(text)
if 'seg' in details:
add_line("\n【分词结果】")
add_line("-" * 60) # 一条简洁的总分隔线
words = details['seg']
items_per_line = 10 # 每行显示的 “索引.词语” 单元数,可自由调整
indexed_items = [f"{i+1}.{word}" for i, word in enumerate(words)]
# 将转换后的列表按指定数量分组,逐行打印
for i in range(0, len(indexed_items), items_per_line):
# 获取当前行的一组数据
line_items = indexed_items[i:i+items_per_line]
# 用两个空格连接并输出当前行
add_line(" ".join(line_items))
add_line("-" * 60) # 结束分隔线,与开头呼应
if self.config['include_table_in_report']:
add_line("\n【详细分析表格】")
headers = ["词语", "位置", "依存头", "头位置", "关系", "距离", "是否嵌套", "专项嵌套"]
table_str = tabulate(details['table_data'], headers=headers,
tablefmt='grid', stralign="center", numalign="center")
add_line(table_str)
# 统计分析
add_line("\n" + "=" * 70)
add_line("嵌套结构深度分析报告")
add_line("=" * 70)
add_line(f"1. 节点总体统计:")
add_line(f" • 总节点数: {stats['total_nodes']}")
add_line(f" • 标点符号节点: {stats['punct_nodes']}")
add_line(f" • 非标点节点: {stats['non_punct_nodes']}")
add_line(f"\n2. 非标点节点嵌套情况:")
add_line(f" • 形成嵌套的节点数: {stats['nested_nodes']}")
add_line(f" • 未形成嵌套的节点数: {stats['total_nodes'] - stats['punct_nodes'] - stats['nested_nodes']}")
add_line(f" • 嵌套节点占比: {stats['nested_ratio']:.1%}")
# 句子复杂度评估
complexity_level = "句子结构较为简单"
if stats['complexity_score'] >= 50:
complexity_level = "句子结构非常复杂,嵌套层次深"
elif stats['complexity_score'] >= 25:
complexity_level = "句子结构较为复杂"
elif stats['complexity_score'] >= 10:
complexity_level = "句子结构中等复杂"
add_line(f"\n3. 句子复杂度评估:")
add_line(f" • 嵌套复杂度评分: {stats['complexity_score']:.1f} (嵌套节点数 × 平均嵌套距离)")
add_line(f" • 评估: {complexity_level}")
# 专项模式统计
if stats['special_patterns_found'] > 0:
add_line(f"\n4. 专项嵌套模式检测:")
add_line(f" • 发现专项模式数量: {stats['special_patterns_found']}处")
add_line("\n" + "=" * 100)
add_line("分析报告结束")
add_line("=" * 100)
return "\n".join(output_lines)
def process_single_file(self, file_path, encoding='utf-8'):
"""处理单个TXT文件(每行一个句子)"""
print(f"正在处理文件: {file_path}")
sentences = []
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line and not line.startswith('#'): # 跳过空行和注释行
sentences.append((line_num, line))
print(f"从文件读取到 {len(sentences)} 个句子")
return self.process_sentences(sentences, os.path.basename(file_path))
def process_directory(self, dir_path, file_pattern="*.txt", encoding='utf-8'):
"""处理目录下的所有TXT文件"""
print(f"正在处理目录: {dir_path}")
all_sentences = []
file_info = []
# 查找所有txt文件
txt_files = glob.glob(os.path.join(dir_path, file_pattern))
print(f"找到 {len(txt_files)} 个TXT文件")
for file_path in txt_files:
file_name = os.path.basename(file_path)
try:
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line and not line.startswith('#'):
all_sentences.append((line_num, line, file_name))
file_info.append({'file': file_name, 'line': line_num})
except Exception as e:
print(f"读取文件 {file_name} 时出错: {e}")
print(f"从所有文件读取到 {len(all_sentences)} 个句子")
return self.process_sentences(all_sentences, "批量分析")
def process_sentences(self, sentences, source_name="语料"):
"""处理句子列表"""
all_stats = []
all_details = []
print(f"开始分析 {len(sentences)} 个句子...")
# 创建详细报告子目录
detailed_reports_dir = os.path.join(self.output_dir, "详细报告")
os.makedirs(detailed_reports_dir, exist_ok=True)
# 处理每个句子
for i, sentence_info in enumerate(sentences):
if len(sentence_info) == 3:
line_num, sentence, file_name = sentence_info
sentence_id = f"{file_name}_L{line_num}"
else:
line_num, sentence = sentence_info
sentence_id = f"{source_name}_L{line_num}"
try:
print(f"分析句子 {i+1}/{len(sentences)}: {sentence[:50]}...")
# 分析句子
stats, details = self.analyze_sentence(sentence, sentence_id)
all_stats.append(stats)
all_details.append(details)
# 生成详细报告
if not self.config['summary_only']:
report = self.generate_report(stats, details, sentence_id)
# 保存单个句子的详细报告
report_filename = f"报告_{sentence_id.replace(':', '_').replace('/', '_')}.txt"
report_path = os.path.join(detailed_reports_dir, report_filename)
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
# 进度显示
if (i + 1) % 10 == 0:
print(f" 已处理 {i+1} 个句子...")
except Exception as e:
print(f"分析句子时出错({sentence_id}): {e}")
# 添加错误记录
error_stats = {
'sentence_id': sentence_id,
'sentence': sentence[:100] if sentence else "",
'word_count': 0,
'analysis_time': 0,
'total_nodes': 0,
'punct_nodes': 0,
'non_punct_nodes': 0,
'nested_nodes': 0,
'nested_ratio': 0,
'complexity_score': 0,
'special_patterns_found': 0,
'error': str(e)
}
all_stats.append(error_stats)
print(f"句子分析完成,开始生成汇总报告...")
# 生成汇总报告
self.generate_summary_reports(all_stats, all_details, source_name)
return all_stats, all_details
def generate_summary_reports(self, all_stats, all_details, source_name):
"""生成汇总报告"""
# 创建DataFrame
df_stats = pd.DataFrame(all_stats)
# 1. 基本统计汇总
summary_file = os.path.join(self.output_dir, f"统计分析汇总_{source_name}.csv")
df_stats.to_csv(summary_file, index=False, encoding='utf-8-sig')
print(f"统计分析汇总已保存到: {summary_file}")
# 2. 详细统计报告
summary_report_file = os.path.join(self.output_dir, f"详细统计报告_{source_name}.txt")
self._create_summary_txt_report(df_stats, summary_report_file, source_name)
# 3. 统计摘要(Excel格式,含图表)
excel_file = os.path.join(self.output_dir, f"统计摘要_{source_name}.xlsx")
self._create_excel_summary(df_stats, excel_file, source_name)
# 4. 配置文件备份
config_file = os.path.join(self.output_dir, "分析配置.txt")
with open(config_file, 'w', encoding='utf-8') as f:
f.write(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"数据来源: {source_name}\n")
f.write(f"分析句子总数: {len(all_stats)}\n")
f.write(f"嵌套检测模式: {self.config['nested_detection_mode']}\n")
f.write(f"标点关系: {', '.join(self.config['punctuation_relations'])}\n")
f.write(f"专项嵌套模式: {self.config['special_nested_patterns']}\n")
print(f"所有报告已保存到目录: {self.output_dir}")
def _create_summary_txt_report(self, df_stats, output_file, source_name):
"""创建文本格式的汇总报告"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write("=" * 80 + "\n")
f.write(f"依存句法分析汇总报告\n")
f.write(f"数据来源: {source_name}\n")
f.write(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 80 + "\n\n")
f.write(f"1. 总体统计\n")
f.write(f" 分析句子总数: {len(df_stats)}\n")
f.write(f" 总词语数量: {df_stats['word_count'].sum()}\n")
f.write(f" 平均每句词语数: {df_stats['word_count'].mean():.1f}\n")
f.write(f" 总分析时间: {df_stats['analysis_time'].sum():.2f}秒\n")
f.write(f" 平均每句分析时间: {df_stats['analysis_time'].mean():.3f}秒\n\n")
f.write(f"2. 嵌套结构统计\n")
f.write(f" 总嵌套节点数: {df_stats['nested_nodes'].sum()}\n")
f.write(f" 平均每句嵌套节点数: {df_stats['nested_nodes'].mean():.1f}\n")
f.write(f" 平均嵌套比例: {df_stats['nested_ratio'].mean():.1%}\n")
f.write(f" 总复杂度评分: {df_stats['complexity_score'].sum():.1f}\n")
f.write(f" 平均复杂度评分: {df_stats['complexity_score'].mean():.1f}\n\n")
f.write(f"3. 句子复杂度分布\n")
bins = [0, 10, 25, 50, float('inf')]
labels = ["简单", "中等", "复杂", "非常复杂"]
df_stats['complexity_level'] = pd.cut(df_stats['complexity_score'], bins=bins, labels=labels)
for level in labels:
count = len(df_stats[df_stats['complexity_level'] == level])
percentage = count / len(df_stats) * 100
f.write(f" {level}: {count}句 ({percentage:.1f}%)\n")
f.write("\n4. 前10个最复杂的句子\n")
top_complex = df_stats.nlargest(10, 'complexity_score')[['sentence_id', 'sentence', 'complexity_score', 'nested_nodes']]
for idx, row in top_complex.iterrows():
f.write(f" {row['sentence_id']}: {row['complexity_score']:.1f}分 ({row['nested_nodes']}个嵌套)\n")
f.write(f" 内容: {row['sentence'][:80]}...\n")
def _create_excel_summary(self, df_stats, output_file, source_name):
"""创建Excel格式的统计摘要"""
try:
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# 写入基本统计数据
df_stats.to_excel(writer, sheet_name='详细数据', index=False)
# 创建统计摘要工作表
summary_data = {
'统计项': ['分析句子总数', '总词语数量', '平均每句词语数',
'总嵌套节点数', '平均每句嵌套节点数', '平均嵌套比例',
'总复杂度评分', '平均复杂度评分', '平均分析时间(秒)'],
'数值': [len(df_stats), df_stats['word_count'].sum(), df_stats['word_count'].mean(),
df_stats['nested_nodes'].sum(), df_stats['nested_nodes'].mean(), df_stats['nested_ratio'].mean(),
df_stats['complexity_score'].sum(), df_stats['complexity_score'].mean(), df_stats['analysis_time'].mean()]
}
df_summary = pd.DataFrame(summary_data)
df_summary.to_excel(writer, sheet_name='统计摘要', index=False)
# 创建复杂度分布工作表
bins = [0, 10, 25, 50, float('inf')]
labels = ["简单", "中等", "复杂", "非常复杂"]
df_stats['complexity_level'] = pd.cut(df_stats['complexity_score'], bins=bins, labels=labels)
complexity_dist = df_stats['complexity_level'].value_counts().reindex(labels).fillna(0)
df_complexity = pd.DataFrame({
'复杂度等级': complexity_dist.index,
'句子数量': complexity_dist.values,
'百分比': (complexity_dist.values / len(df_stats) * 100).round(1)
})
df_complexity.to_excel(writer, sheet_name='复杂度分布', index=False)
print(f"Excel统计摘要已保存到: {output_file}")
except Exception as e:
print(f"创建Excel文件时出错: {e}")
# 使用示例
def main():
"""主函数:演示批量分析功能"""
print("依存句法批量分析系统")
print("=" * 50)
# 创建分析器实例 (暂不初始化输出目录)
analyzer = BatchSentenceAnalyzer()
# 用户选择处理模式
print("\n请选择处理模式:")
print("1. 处理单个TXT文件(每行一个句子)")
print("2. 处理目录下的所有TXT文件")
print("3. 直接分析示例句子")
choice = input("请选择 (1/2/3): ").strip()
if choice == "1":
# 处理单个文件
file_path = input("请输入TXT文件路径: ").strip()
if os.path.exists(file_path):
input_dir = os.path.dirname(file_path)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
analyzer.output_dir = os.path.join(input_dir, f"依存分析结果_{timestamp}")
os.makedirs(analyzer.output_dir, exist_ok=True)
print(f"结果将保存至: {analyzer.output_dir}")
analyzer.process_single_file(file_path)
else:
print(f"文件不存在: {file_path}")
elif choice == "2":
# 处理目录
dir_path = input("请输入目录路径: ").strip()
if os.path.exists(dir_path):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
analyzer.output_dir = os.path.join(dir_path, f"依存分析结果_{timestamp}")
os.makedirs(analyzer.output_dir, exist_ok=True)
print(f"结果将保存至: {analyzer.output_dir}")
analyzer.process_directory(dir_path)
else:
print(f"目录不存在: {dir_path}")
elif choice == "3":
# 直接分析示例句子(保持原逻辑,输出到程序所在目录)
sample_sentences = [
(1, "语言学是一门研究语言的科学,它关注语言的各个层面。"),
(2, "通过分析大量的语料,语言学家可以揭示语言的规律和变化。"),
(3, "自然语言处理技术为语言学研究提供了新的工具和方法。")
]
print(f"\n分析 {len(sample_sentences)} 个示例句子...")
# 对于示例句子,默认输出到当前脚本所在目录
analyzer.output_dir = os.path.join(os.path.dirname(__file__), f"依存分析结果_示例_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
os.makedirs(analyzer.output_dir, exist_ok=True)
print(f"结果将保存至: {analyzer.output_dir}")
analyzer.process_sentences(sample_sentences, "示例句子")
else:
print("无效选择,程序退出。")
print("\n分析完成!")
if __name__ == "__main__":
main()最后更新于