句法结构角度的句子复杂度分析器(基于哈工大LTP)

借助哈工大LTP工具对句子进行句法依存分析,最后导出自动分析报告至语料库txt文件所在文件夹。

from ltp import LTP
from tabulate import tabulate
from collections import defaultdict
import time
import os
import glob
import pandas as pd
from datetime import datetime

class BatchSentenceAnalyzer:
    """批量句子分析器"""
    
    def __init__(self, ltp_model_path=None, config=None):
        """初始化分析器"""
        self.ltp = LTP() if not ltp_model_path else LTP(ltp_model_path)
        self.config = config or {
            'punctuation_relations': {"WP", "MP"},
            'nested_detection_mode': "direct",
            'special_nested_patterns': [("ATT", "ATT"), ("VOB", "ATT"), ("SBV", "ATT"), ("COO", "VOB")],
            'include_table_in_report': True,
            'summary_only': False,
        }
        
        self.output_dir = None 
        
        print("分析器初始化完成,输出目录将在处理时确定。")
    
    def analyze_sentence(self, sentence, sentence_id=None):
        """分析单个句子"""
        start_time = time.time()
        
        # 执行分析
        result = self.ltp.pipeline([sentence], tasks=["cws", "dep"])
        
        seg = result.cws[0]
        dep = result.dep[0]
        heads = dep["head"]
        labels = dep["label"]
        
        analysis_time = time.time() - start_time
        
        # 构建快速查找的数据结构
        children_dict = {i: [] for i in range(len(seg) + 1)}
        for child_idx, parent_idx in enumerate(heads, start=1):
            children_dict[parent_idx].append(child_idx)
        
        # 辅助函数
        def is_punctuation(relation_label):
            return relation_label in self.config['punctuation_relations']
        
        def is_nested_direct(node_index, relation_label):
            if is_punctuation(relation_label):
                return "标点"
            for child in children_dict.get(node_index, []):
                if not is_punctuation(labels[child - 1]):
                    return "是"
            return "否"
        
        def is_nested_recursive(node_index, relation_label):
            if is_punctuation(relation_label):
                return "标点"
            stack = children_dict.get(node_index, [].copy())
            while stack:
                current_child = stack.pop()
                if not is_punctuation(labels[current_child - 1]):
                    return "是"
                stack.extend(children_dict.get(current_child, []))
            return "否"
        
        # 选择检测模式
        if self.config['nested_detection_mode'] == "direct":
            is_nested = is_nested_direct
        
        def check_special_nested(node_index, relation_label, pattern):
            if is_punctuation(relation_label):
                return False
            parent_rel = relation_label
            target_child_rel = pattern[1]
            for child in children_dict.get(node_index, []):
                if labels[child - 1] == target_child_rel and not is_punctuation(labels[child - 1]):
                    if self.config['nested_detection_mode'] == "recursive":
                        stack = [(child, [child])]
                        while stack:
                            current, path = stack.pop()
                            if labels[current - 1] == pattern[0] and not is_punctuation(labels[current - 1]) and len(path) > 1:
                                return True
                            for grandchild in children_dict.get(current, []):
                                stack.append((grandchild, path + [grandchild]))
                    else:
                        return True
            return False
        
        # 构建表格数据
        table_data = []
        special_nested_info = defaultdict(list)
        all_relations = set(labels)
        
        for i, word in enumerate(seg):
            word_idx = i + 1
            head_idx = heads[i]
            if head_idx == 0:
                head_word = "ROOT"
                dep_pos = "0"
                distance = 0
            else:
                head_word = seg[head_idx - 1]
                dep_pos = str(head_idx)
                distance = abs(word_idx - head_idx)
            
            current_relation = labels[i]
            nested_status = is_nested(word_idx, current_relation)
            
            special_tags = []
            if not is_punctuation(current_relation):
                for pattern in self.config['special_nested_patterns']:
                    if current_relation == pattern[0] and check_special_nested(word_idx, current_relation, pattern):
                        tag = f"{pattern[0]}{pattern[1]}"
                        special_tags.append(tag)
                        special_nested_info[tag].append(f"{word}({word_idx})")
            
            special_field = "、".join(special_tags) if special_tags else "/"
            table_data.append([word, word_idx, head_word, dep_pos, current_relation, distance, nested_status, special_field])
        
        # 分类统计
        punct_nodes = [row for row in table_data if row[6] == "标点"]
        nested_nodes = [row for row in table_data if row[6] == "是"]
        non_nested_nodes = [row for row in table_data if row[6] == "否"]
        non_punct_nodes = [row for row in table_data if row[6] != "标点"]
        
        # 计算复杂度评分
        if nested_nodes:
            avg_dist_nested = sum(row[5] for row in nested_nodes) / len(nested_nodes)
            complexity_score = len(nested_nodes) * avg_dist_nested
        else:
            complexity_score = 0
        
        # 返回结果
        stats = {
            'sentence_id': sentence_id or 0,
            'sentence': sentence,
            'word_count': len(seg),
            'analysis_time': analysis_time,
            'total_nodes': len(table_data),
            'punct_nodes': len(punct_nodes),
            'non_punct_nodes': len(non_punct_nodes),
            'nested_nodes': len(nested_nodes),
            'nested_ratio': len(nested_nodes) / len(non_punct_nodes) if len(non_punct_nodes) > 0 else 0,
            'complexity_score': complexity_score,
            'special_patterns_found': sum(len(v) for v in special_nested_info.values()),
        }
        
        details = {
            'table_data': table_data,
            'special_nested_info': dict(special_nested_info),
            'all_relations': list(all_relations),
            'seg': seg,
            'heads': heads,
            'labels': labels,
        }
        
        return stats, details
    
    def generate_report(self, stats, details, sentence_id=None):
        """生成详细报告"""
        output_lines = []
        
        def add_line(text):
            output_lines.append(text)

        if 'seg' in details:
            add_line("\n【分词结果】")
            add_line("-" * 60)  # 一条简洁的总分隔线
            
            words = details['seg']
            items_per_line = 10  # 每行显示的 “索引.词语” 单元数,可自由调整
            indexed_items = [f"{i+1}.{word}" for i, word in enumerate(words)]
            
            # 将转换后的列表按指定数量分组,逐行打印
            for i in range(0, len(indexed_items), items_per_line):
                # 获取当前行的一组数据
                line_items = indexed_items[i:i+items_per_line]
                # 用两个空格连接并输出当前行
                add_line("  ".join(line_items))
            
            add_line("-" * 60) # 结束分隔线,与开头呼应

        if self.config['include_table_in_report']:
            add_line("\n【详细分析表格】")
            headers = ["词语", "位置", "依存头", "头位置", "关系", "距离", "是否嵌套", "专项嵌套"]
            table_str = tabulate(details['table_data'], headers=headers, 
                               tablefmt='grid', stralign="center", numalign="center")
            add_line(table_str)
        
        # 统计分析
        add_line("\n" + "=" * 70)
        add_line("嵌套结构深度分析报告")
        add_line("=" * 70)
        
        add_line(f"1. 节点总体统计:")
        add_line(f"   • 总节点数: {stats['total_nodes']}")
        add_line(f"   • 标点符号节点: {stats['punct_nodes']}")
        add_line(f"   • 非标点节点: {stats['non_punct_nodes']}")
        
        add_line(f"\n2. 非标点节点嵌套情况:")
        add_line(f"   • 形成嵌套的节点数: {stats['nested_nodes']}")
        add_line(f"   • 未形成嵌套的节点数: {stats['total_nodes'] - stats['punct_nodes'] - stats['nested_nodes']}")
        add_line(f"   • 嵌套节点占比: {stats['nested_ratio']:.1%}")
        
        # 句子复杂度评估
        complexity_level = "句子结构较为简单"
        if stats['complexity_score'] >= 50:
            complexity_level = "句子结构非常复杂,嵌套层次深"
        elif stats['complexity_score'] >= 25:
            complexity_level = "句子结构较为复杂"
        elif stats['complexity_score'] >= 10:
            complexity_level = "句子结构中等复杂"
        
        add_line(f"\n3. 句子复杂度评估:")
        add_line(f"   • 嵌套复杂度评分: {stats['complexity_score']:.1f} (嵌套节点数 × 平均嵌套距离)")
        add_line(f"   • 评估: {complexity_level}")
        
        # 专项模式统计
        if stats['special_patterns_found'] > 0:
            add_line(f"\n4. 专项嵌套模式检测:")
            add_line(f"   • 发现专项模式数量: {stats['special_patterns_found']}处")
        
        add_line("\n" + "=" * 100)
        add_line("分析报告结束")
        add_line("=" * 100)
        
        return "\n".join(output_lines)
    
    def process_single_file(self, file_path, encoding='utf-8'):
        """处理单个TXT文件(每行一个句子)"""
        print(f"正在处理文件: {file_path}")
        
        sentences = []
        with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if line and not line.startswith('#'):  # 跳过空行和注释行
                    sentences.append((line_num, line))
        
        print(f"从文件读取到 {len(sentences)} 个句子")
        return self.process_sentences(sentences, os.path.basename(file_path))
    
    def process_directory(self, dir_path, file_pattern="*.txt", encoding='utf-8'):
        """处理目录下的所有TXT文件"""
        print(f"正在处理目录: {dir_path}")
        
        all_sentences = []
        file_info = []
        
        # 查找所有txt文件
        txt_files = glob.glob(os.path.join(dir_path, file_pattern))
        print(f"找到 {len(txt_files)} 个TXT文件")
        
        for file_path in txt_files:
            file_name = os.path.basename(file_path)
            try:
                with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
                    for line_num, line in enumerate(f, 1):
                        line = line.strip()
                        if line and not line.startswith('#'):
                            all_sentences.append((line_num, line, file_name))
                            file_info.append({'file': file_name, 'line': line_num})
            except Exception as e:
                print(f"读取文件 {file_name} 时出错: {e}")
        
        print(f"从所有文件读取到 {len(all_sentences)} 个句子")
        return self.process_sentences(all_sentences, "批量分析")
    
    def process_sentences(self, sentences, source_name="语料"):
        """处理句子列表"""
        all_stats = []
        all_details = []
        
        print(f"开始分析 {len(sentences)} 个句子...")
        
        # 创建详细报告子目录
        detailed_reports_dir = os.path.join(self.output_dir, "详细报告")
        os.makedirs(detailed_reports_dir, exist_ok=True)
        
        # 处理每个句子
        for i, sentence_info in enumerate(sentences):
            if len(sentence_info) == 3:
                line_num, sentence, file_name = sentence_info
                sentence_id = f"{file_name}_L{line_num}"
            else:
                line_num, sentence = sentence_info
                sentence_id = f"{source_name}_L{line_num}"
            
            try:
                print(f"分析句子 {i+1}/{len(sentences)}: {sentence[:50]}...")
                
                # 分析句子
                stats, details = self.analyze_sentence(sentence, sentence_id)
                all_stats.append(stats)
                all_details.append(details)
                
                # 生成详细报告
                if not self.config['summary_only']:
                    report = self.generate_report(stats, details, sentence_id)
                    
                    # 保存单个句子的详细报告
                    report_filename = f"报告_{sentence_id.replace(':', '_').replace('/', '_')}.txt"
                    report_path = os.path.join(detailed_reports_dir, report_filename)
                    
                    with open(report_path, 'w', encoding='utf-8') as f:
                        f.write(report)
                
                # 进度显示
                if (i + 1) % 10 == 0:
                    print(f"  已处理 {i+1} 个句子...")
                    
            except Exception as e:
                print(f"分析句子时出错({sentence_id}): {e}")
                # 添加错误记录
                error_stats = {
                    'sentence_id': sentence_id,
                    'sentence': sentence[:100] if sentence else "",
                    'word_count': 0,
                    'analysis_time': 0,
                    'total_nodes': 0,
                    'punct_nodes': 0,
                    'non_punct_nodes': 0,
                    'nested_nodes': 0,
                    'nested_ratio': 0,
                    'complexity_score': 0,
                    'special_patterns_found': 0,
                    'error': str(e)
                }
                all_stats.append(error_stats)
        
        print(f"句子分析完成,开始生成汇总报告...")
        
        # 生成汇总报告
        self.generate_summary_reports(all_stats, all_details, source_name)
        
        return all_stats, all_details
    
    def generate_summary_reports(self, all_stats, all_details, source_name):
        """生成汇总报告"""
        # 创建DataFrame
        df_stats = pd.DataFrame(all_stats)
        
        # 1. 基本统计汇总
        summary_file = os.path.join(self.output_dir, f"统计分析汇总_{source_name}.csv")
        df_stats.to_csv(summary_file, index=False, encoding='utf-8-sig')
        print(f"统计分析汇总已保存到: {summary_file}")
        
        # 2. 详细统计报告
        summary_report_file = os.path.join(self.output_dir, f"详细统计报告_{source_name}.txt")
        self._create_summary_txt_report(df_stats, summary_report_file, source_name)
        
        # 3. 统计摘要(Excel格式,含图表)
        excel_file = os.path.join(self.output_dir, f"统计摘要_{source_name}.xlsx")
        self._create_excel_summary(df_stats, excel_file, source_name)
        
        # 4. 配置文件备份
        config_file = os.path.join(self.output_dir, "分析配置.txt")
        with open(config_file, 'w', encoding='utf-8') as f:
            f.write(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"数据来源: {source_name}\n")
            f.write(f"分析句子总数: {len(all_stats)}\n")
            f.write(f"嵌套检测模式: {self.config['nested_detection_mode']}\n")
            f.write(f"标点关系: {', '.join(self.config['punctuation_relations'])}\n")
            f.write(f"专项嵌套模式: {self.config['special_nested_patterns']}\n")
        
        print(f"所有报告已保存到目录: {self.output_dir}")
    
    def _create_summary_txt_report(self, df_stats, output_file, source_name):
        """创建文本格式的汇总报告"""
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write("=" * 80 + "\n")
            f.write(f"依存句法分析汇总报告\n")
            f.write(f"数据来源: {source_name}\n")
            f.write(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("=" * 80 + "\n\n")
            
            f.write(f"1. 总体统计\n")
            f.write(f"   分析句子总数: {len(df_stats)}\n")
            f.write(f"   总词语数量: {df_stats['word_count'].sum()}\n")
            f.write(f"   平均每句词语数: {df_stats['word_count'].mean():.1f}\n")
            f.write(f"   总分析时间: {df_stats['analysis_time'].sum():.2f}\n")
            f.write(f"   平均每句分析时间: {df_stats['analysis_time'].mean():.3f}\n\n")
            
            f.write(f"2. 嵌套结构统计\n")
            f.write(f"   总嵌套节点数: {df_stats['nested_nodes'].sum()}\n")
            f.write(f"   平均每句嵌套节点数: {df_stats['nested_nodes'].mean():.1f}\n")
            f.write(f"   平均嵌套比例: {df_stats['nested_ratio'].mean():.1%}\n")
            f.write(f"   总复杂度评分: {df_stats['complexity_score'].sum():.1f}\n")
            f.write(f"   平均复杂度评分: {df_stats['complexity_score'].mean():.1f}\n\n")
            
            f.write(f"3. 句子复杂度分布\n")
            bins = [0, 10, 25, 50, float('inf')]
            labels = ["简单", "中等", "复杂", "非常复杂"]
            df_stats['complexity_level'] = pd.cut(df_stats['complexity_score'], bins=bins, labels=labels)
            
            for level in labels:
                count = len(df_stats[df_stats['complexity_level'] == level])
                percentage = count / len(df_stats) * 100
                f.write(f"   {level}: {count}句 ({percentage:.1f}%)\n")
            
            f.write("\n4. 前10个最复杂的句子\n")
            top_complex = df_stats.nlargest(10, 'complexity_score')[['sentence_id', 'sentence', 'complexity_score', 'nested_nodes']]
            for idx, row in top_complex.iterrows():
                f.write(f"   {row['sentence_id']}: {row['complexity_score']:.1f}分 ({row['nested_nodes']}个嵌套)\n")
                f.write(f"     内容: {row['sentence'][:80]}...\n")
    
    def _create_excel_summary(self, df_stats, output_file, source_name):
        """创建Excel格式的统计摘要"""
        try:
            with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
                # 写入基本统计数据
                df_stats.to_excel(writer, sheet_name='详细数据', index=False)
                
                # 创建统计摘要工作表
                summary_data = {
                    '统计项': ['分析句子总数', '总词语数量', '平均每句词语数', 
                            '总嵌套节点数', '平均每句嵌套节点数', '平均嵌套比例',
                            '总复杂度评分', '平均复杂度评分', '平均分析时间(秒)'],
                    '数值': [len(df_stats), df_stats['word_count'].sum(), df_stats['word_count'].mean(),
                           df_stats['nested_nodes'].sum(), df_stats['nested_nodes'].mean(), df_stats['nested_ratio'].mean(),
                           df_stats['complexity_score'].sum(), df_stats['complexity_score'].mean(), df_stats['analysis_time'].mean()]
                }
                df_summary = pd.DataFrame(summary_data)
                df_summary.to_excel(writer, sheet_name='统计摘要', index=False)
                
                # 创建复杂度分布工作表
                bins = [0, 10, 25, 50, float('inf')]
                labels = ["简单", "中等", "复杂", "非常复杂"]
                df_stats['complexity_level'] = pd.cut(df_stats['complexity_score'], bins=bins, labels=labels)
                
                complexity_dist = df_stats['complexity_level'].value_counts().reindex(labels).fillna(0)
                df_complexity = pd.DataFrame({
                    '复杂度等级': complexity_dist.index,
                    '句子数量': complexity_dist.values,
                    '百分比': (complexity_dist.values / len(df_stats) * 100).round(1)
                })
                df_complexity.to_excel(writer, sheet_name='复杂度分布', index=False)
                
            print(f"Excel统计摘要已保存到: {output_file}")
        except Exception as e:
            print(f"创建Excel文件时出错: {e}")

# 使用示例
def main():
    """主函数:演示批量分析功能"""
    print("依存句法批量分析系统")
    print("=" * 50)
    
    # 创建分析器实例 (暂不初始化输出目录)
    analyzer = BatchSentenceAnalyzer()
    
    # 用户选择处理模式
    print("\n请选择处理模式:")
    print("1. 处理单个TXT文件(每行一个句子)")
    print("2. 处理目录下的所有TXT文件")
    print("3. 直接分析示例句子")
    
    choice = input("请选择 (1/2/3): ").strip()
    
    if choice == "1":
        # 处理单个文件
        file_path = input("请输入TXT文件路径: ").strip()
        if os.path.exists(file_path):
            input_dir = os.path.dirname(file_path)
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            analyzer.output_dir = os.path.join(input_dir, f"依存分析结果_{timestamp}")
            os.makedirs(analyzer.output_dir, exist_ok=True)
            
            print(f"结果将保存至: {analyzer.output_dir}")
            analyzer.process_single_file(file_path)
        else:
            print(f"文件不存在: {file_path}")
    
    elif choice == "2":
        # 处理目录
        dir_path = input("请输入目录路径: ").strip()
        if os.path.exists(dir_path):
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            analyzer.output_dir = os.path.join(dir_path, f"依存分析结果_{timestamp}")
            os.makedirs(analyzer.output_dir, exist_ok=True)
            
            print(f"结果将保存至: {analyzer.output_dir}")
            analyzer.process_directory(dir_path)
        else:
            print(f"目录不存在: {dir_path}")
    
    elif choice == "3":
        # 直接分析示例句子(保持原逻辑,输出到程序所在目录)
        sample_sentences = [
            (1, "语言学是一门研究语言的科学,它关注语言的各个层面。"),
            (2, "通过分析大量的语料,语言学家可以揭示语言的规律和变化。"),
            (3, "自然语言处理技术为语言学研究提供了新的工具和方法。")
        ]
        print(f"\n分析 {len(sample_sentences)} 个示例句子...")
        # 对于示例句子,默认输出到当前脚本所在目录
        analyzer.output_dir = os.path.join(os.path.dirname(__file__), f"依存分析结果_示例_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
        os.makedirs(analyzer.output_dir, exist_ok=True)
        print(f"结果将保存至: {analyzer.output_dir}")
        analyzer.process_sentences(sample_sentences, "示例句子")
    
    else:
        print("无效选择,程序退出。")
    
    print("\n分析完成!")

if __name__ == "__main__":
    main()

最后更新于