A-A+

Burp Suite敏感信息检测插件 – 检测手机号、IP地址、邮箱地址

2025年06月20日 12:52 学习笔记 暂无评论 共46996字 (阅读639 views次)

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】

主要功能

  1. 敏感信息检测 - 手机号、IP地址、邮箱地址
  2. Base64解码检测 - 解码后再检测敏感信息
  3. 性能优化 - 大小限制、超时控制(避免返回的响应体过大burp卡死,避免检测数据过多影响程序返回数据)
  4. 编码处理 - 多种编码支持,解决乱码问题(utf-8、gbk、gb3121等自动适配)
  5. 界面展示 - 主标签页和响应标签页
  6. 配置选项 - 可开关各种检测功能
  7. 统计信息 - 详细的处理统计
  8. 导出功能 - 结果导出到控制台

主要类

  • BurpExtender - 主插件类
  • SensitiveInfoTab - 响应标签页类
  • ClearButtonListener - 清除按钮监听器
  • ExportButtonListener - 导出按钮监听器
  • ConfigChangeListener - 配置变更监听器

保存下面的代码为python,用burp加载即可哦。

 

# -*- coding: utf-8 -*-
from burp import IBurpExtender
from burp import IHttpListener
from burp import IMessageEditorTabFactory
from burp import IMessageEditorTab
from burp import ITab
from burp import IContextMenuFactory

import re
import base64
import json
import time
from javax.swing import JPanel, JScrollPane, JTextArea, JLabel, BorderFactory, BoxLayout
from javax.swing.border import TitledBorder
from java.awt import Color, Font, BorderLayout, FlowLayout, Dimension
from java.awt.event import ActionListener
from javax.swing import JButton, JCheckBox, JSpinner, SpinnerNumberModel
import threading


class BurpExtender(IBurpExtender, IHttpListener, IMessageEditorTabFactory, ITab):
    """
    敏感信息检测插件 - 检测手机号、IP地址、邮箱地址
    """

    def __init__(self):
        # 性能配置
        self.MAX_CONTENT_SIZE = 1024 * 1024 * 1  # 1MB 最大处理内容大小
        self.MAX_BASE64_SIZE = 1024 * 100  # 100KB 最大Base64解码大小
        self.MAX_PROCESSING_TIME = 5.0  # 5秒最大处理时间
        self.MIN_BASE64_LENGTH = 20  # Base64最小长度
        self.MAX_DETECTION_ITEMS = 1000  # 最大检测项目数

        # 颜色定义
        self.COLOR_PHONE = Color(255, 200, 200)  # 浅红色 - 手机号
        self.COLOR_IP = Color(200, 255, 200)  # 浅绿色 - IP地址
        self.COLOR_EMAIL = Color(200, 200, 255)  # 浅蓝色 - 邮箱
        self.COLOR_MIXED = Color(255, 255, 150)  # 浅黄色 - 混合类型

        # 正则表达式模式
        #self.phone_pattern = re.compile(r'(?<!\d)(1[3-9]\d{9})(?!\d)')
        self.phone_pattern = re.compile(r'(?<!\d)(1(?:[358][0-9]|4[579]|66|7[0135678]|9[89])\d{8})(?!\d)')
        self.ip_pattern = re.compile(
            r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b')
        self.email_pattern = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b')
        self.base64_pattern = re.compile(r'[A-Za-z0-9+/]{' + str(self.MIN_BASE64_LENGTH) + r',}(?:==|=)?')

        # 编码列表 - 按优先级排序
        self.encoding_list = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5', 'iso-8859-1', 'windows-1252', 'latin1']

        # 存储检测结果和统计信息
        self.detection_results = {}
        self.stats = {
            'total_processed': 0,
            'total_sensitive': 0,
            'skipped_large': 0,
            'skipped_timeout': 0,
            'encoding_errors': 0
        }
        self.lock = threading.Lock()

        # 配置选项
        self.enable_base64_detection = True
        self.enable_phone_detection = True
        self.enable_ip_detection = True
        self.enable_email_detection = True

    def registerExtenderCallbacks(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()

        # 设置插件名称
        callbacks.setExtensionName("Sensitive Info Detector Pro")

        # 注册各种监听器
        callbacks.registerHttpListener(self)
        callbacks.registerMessageEditorTabFactory(self)

        # 创建主界面
        self._create_ui()
        callbacks.addSuiteTab(self)

        print("Sensitive Info Detector Pro plugin loaded successfully!")
        print("Max content size: " + str(self.MAX_CONTENT_SIZE / 1024 / 1024) + "MB")
        print("Max processing time: " + str(self.MAX_PROCESSING_TIME) + "s")

    def _create_ui(self):
        """创建插件主界面"""
        self._main_panel = JPanel(BorderLayout())

        # 标题和配置面板
        top_panel = JPanel(BorderLayout())

        title_label = JLabel("Sensitive Information Detection Results")
        title_label.setFont(Font("Dialog", Font.BOLD, 16))
        title_label.setBorder(BorderFactory.createEmptyBorder(10, 10, 10, 10))

        # 配置面板
        config_panel = self._create_config_panel()

        top_panel.add(title_label, BorderLayout.NORTH)
        top_panel.add(config_panel, BorderLayout.SOUTH)

        # 结果显示区域
        self._results_area = JTextArea(20, 80)
        self._results_area.setEditable(False)
        self._results_area.setFont(Font("Monospaced", Font.PLAIN, 12))
        scroll_pane = JScrollPane(self._results_area)
        scroll_pane.setBorder(TitledBorder("Detection Results"))

        # 按钮和统计面板
        bottom_panel = JPanel(BorderLayout())

        # 按钮面板
        button_panel = JPanel(FlowLayout())
        clear_button = JButton("Clear Results")
        clear_button.addActionListener(ClearButtonListener(self))
        export_button = JButton("Export Results")
        export_button.addActionListener(ExportButtonListener(self))
        button_panel.add(clear_button)
        button_panel.add(export_button)

        # 统计信息
        self._stats_label = JLabel("Statistics: 0 requests processed")
        self._stats_label.setBorder(BorderFactory.createEmptyBorder(5, 10, 5, 10))

        bottom_panel.add(button_panel, BorderLayout.NORTH)
        bottom_panel.add(self._stats_label, BorderLayout.SOUTH)

        # 布局
        self._main_panel.add(top_panel, BorderLayout.NORTH)
        self._main_panel.add(scroll_pane, BorderLayout.CENTER)
        self._main_panel.add(bottom_panel, BorderLayout.SOUTH)

    def _create_config_panel(self):
        """创建配置面板"""
        config_panel = JPanel(FlowLayout(FlowLayout.LEFT))
        config_panel.setBorder(TitledBorder("Configuration"))

        # 检测选项
        self._phone_checkbox = JCheckBox("Phone Numbers", self.enable_phone_detection)
        self._ip_checkbox = JCheckBox("IP Addresses", self.enable_ip_detection)
        self._email_checkbox = JCheckBox("Email Addresses", self.enable_email_detection)
        self._base64_checkbox = JCheckBox("Base64 Decoding", self.enable_base64_detection)

        # 大小限制配置
        size_label = JLabel("Max Size (MB):")
        self._size_spinner = JSpinner(SpinnerNumberModel(
            int(self.MAX_CONTENT_SIZE / 1024 / 1024), 1, 50, 1))

        # 添加组件
        config_panel.add(self._phone_checkbox)
        config_panel.add(self._ip_checkbox)
        config_panel.add(self._email_checkbox)
        config_panel.add(self._base64_checkbox)
        config_panel.add(JLabel("  |  "))
        config_panel.add(size_label)
        config_panel.add(self._size_spinner)

        # 添加监听器
        self._phone_checkbox.addActionListener(ConfigChangeListener(self, 'phone'))
        self._ip_checkbox.addActionListener(ConfigChangeListener(self, 'ip'))
        self._email_checkbox.addActionListener(ConfigChangeListener(self, 'email'))
        self._base64_checkbox.addActionListener(ConfigChangeListener(self, 'base64'))

        return config_panel

    def update_config(self, config_type):
        """更新配置"""
        if config_type == 'phone':
            self.enable_phone_detection = self._phone_checkbox.isSelected()
        elif config_type == 'ip':
            self.enable_ip_detection = self._ip_checkbox.isSelected()
        elif config_type == 'email':
            self.enable_email_detection = self._email_checkbox.isSelected()
        elif config_type == 'base64':
            self.enable_base64_detection = self._base64_checkbox.isSelected()

        # 更新大小限制
        try:
            new_size = int(self._size_spinner.getValue()) * 1024 * 1024
            self.MAX_CONTENT_SIZE = new_size
        except:
            pass

    def getTabCaption(self):
        return "Sensitive Info"

    def getUiComponent(self):
        return self._main_panel

    def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):
        """处理HTTP消息"""
        # 只处理 Proxy(4), Scanner(16), Intruder(32), Repeater(64) 的响应
        if toolFlag not in [4, 16, 32, 64] or messageIsRequest:
            return

        start_time = time.time()

        try:
            response = messageInfo.getResponse()
            if not response:
                return

            # 分析响应
            analyzed_response = self._helpers.analyzeResponse(response)
            headers = analyzed_response.getHeaders()
            body_offset = analyzed_response.getBodyOffset()
            body = response[body_offset:]

            if not body or len(body) == 0:
                return

            # 检查内容大小
            content_size = len(body)
            if content_size > self.MAX_CONTENT_SIZE:
                with self.lock:
                    self.stats['skipped_large'] += 1
                self._print_debug("Skipping large content: " + str(content_size) + " bytes")
                return

            # 解码响应体(带超时检查)
            if time.time() - start_time > self.MAX_PROCESSING_TIME:
                with self.lock:
                    self.stats['skipped_timeout'] += 1
                return

            body_string = self._robust_decode_bytes(body, headers)
            if body_string is None:
                with self.lock:
                    self.stats['encoding_errors'] += 1
                return

            # 再次检查处理时间
            if time.time() - start_time > self.MAX_PROCESSING_TIME:
                with self.lock:
                    self.stats['skipped_timeout'] += 1
                return

            # 检测敏感信息
            detection_result = self._detect_sensitive_info(body_string, start_time)

            # 更新统计
            with self.lock:
                self.stats['total_processed'] += 1

            if detection_result and detection_result['has_sensitive_info']:
                # 存储检测结果
                request_info = self._helpers.analyzeRequest(messageInfo.getHttpService(), messageInfo.getRequest())
                url = str(request_info.getUrl())

                with self.lock:
                    self.detection_results[url] = detection_result
                    self.stats['total_sensitive'] += 1

                # 设置响应高亮颜色
                color = self._determine_highlight_color(detection_result)
                messageInfo.setHighlight(color)

                # 更新界面显示
                self._update_results_display()

        except Exception as e:
            with self.lock:
                self.stats['encoding_errors'] += 1
            self._print_error("Error processing HTTP message: " + str(e))

    def _robust_decode_bytes(self, byte_data, headers):
        """强力解码字节数据,处理各种编码问题"""
        if not byte_data:
            return ""

        try:
            # 方法1: 使用Burp的helpers
            try:
                if hasattr(self._helpers, 'bytesToString'):
                    decoded = self._helpers.bytesToString(byte_data)
                    if decoded and len(decoded) > 0:
                        return decoded
            except:
                pass

            # 方法2: 直接处理byte array
            try:
                if hasattr(byte_data, 'tostring'):
                    raw_bytes = byte_data.tostring()
                elif hasattr(byte_data, '__iter__'):
                    # 如果是字节数组,转换为字符串
                    raw_bytes = ''.join(chr(b & 0xFF) for b in byte_data)
                else:
                    raw_bytes = str(byte_data)
            except Exception as e:
                self._print_debug("Failed to extract raw bytes: " + str(e))
                return ""

            # 限制大小
            if len(raw_bytes) > self.MAX_CONTENT_SIZE:
                raw_bytes = raw_bytes[:self.MAX_CONTENT_SIZE]

            # 方法3: 从HTTP头获取编码并尝试解码
            header_encoding = self._get_encoding_from_headers(headers)
            if header_encoding:
                try:
                    if isinstance(raw_bytes, str):
                        return raw_bytes.decode(header_encoding, 'replace')
                    else:
                        return unicode(raw_bytes, header_encoding, 'replace')
                except Exception as e:
                    self._print_debug("Header encoding failed (" + header_encoding + "): " + str(e))

            # 方法4: 尝试多种编码
            for encoding in self.encoding_list:
                try:
                    if isinstance(raw_bytes, str):
                        decoded = raw_bytes.decode(encoding, 'replace')
                    else:
                        decoded = unicode(raw_bytes, encoding, 'replace')

                    # 验证解码质量
                    if self._is_good_decode(decoded):
                        self._print_debug("Successfully decoded with: " + encoding)
                        return decoded
                except Exception as e:
                    self._print_debug("Encoding " + encoding + " failed: " + str(e))
                    continue

            # 方法5: 强制使用Latin1(几乎总是成功)
            try:
                if isinstance(raw_bytes, str):
                    return raw_bytes.decode('latin1', 'replace')
                else:
                    return unicode(raw_bytes, 'latin1', 'replace')
            except:
                pass

            # 方法6: 最后的fallback - 直接返回原始字符串
            try:
                return str(raw_bytes)
            except:
                return ""

        except Exception as e:
            self._print_error("Complete decode failure: " + str(e))
            return None

    def _is_good_decode(self, decoded_text):
        """检查解码质量"""
        try:
            if not decoded_text or len(decoded_text) == 0:
                return True

            # 检查替换字符比例
            if hasattr(decoded_text, 'count'):
                replacement_chars = decoded_text.count(u'\ufffd')
                if len(decoded_text) > 0 and float(replacement_chars) / len(decoded_text) > 0.1:
                    return False

            # 检查常见乱码模式
            bad_patterns = ['\ufffd\ufffd', '锟斤拷', '????']
            for pattern in bad_patterns:
                if pattern in decoded_text:
                    return False

            return True
        except:
            return True

    def _get_encoding_from_headers(self, headers):
        """从HTTP头获取编码"""
        try:
            for header in headers:
                if header and header.lower().startswith("content-type:"):
                    charset_match = re.search(r'charset=([^;\s]+)', header.lower())
                    if charset_match:
                        charset = charset_match.group(1).strip('"\'')
                        # 标准化编码名称
                        charset_mapping = {
                            'gb2312': 'gbk',
                            'gb18030': 'gbk',
                            'utf8': 'utf-8',
                            'iso-8859-1': 'latin1'
                        }
                        return charset_mapping.get(charset.lower(), charset)
        except Exception as e:
            self._print_debug("Header parsing error: " + str(e))
        return None

    def _detect_sensitive_info(self, content, start_time):
        """检测敏感信息(带性能限制)"""
        result = {
            'has_sensitive_info': False,
            'phones': [],
            'ips': [],
            'emails': [],
            'base64_decoded_phones': [],
            'base64_decoded_ips': [],
            'base64_decoded_emails': [],
            'processing_time': 0,
            'content_size': len(content) if content else 0
        }

        try:
            # 检查处理时间
            if time.time() - start_time > self.MAX_PROCESSING_TIME:
                return result

            # 安全地处理内容
            if not content:
                return result

            # 限制内容长度
            if len(content) > self.MAX_CONTENT_SIZE:
                content = content[:self.MAX_CONTENT_SIZE]

            # 确保content是字符串类型
            if not isinstance(content, (str, unicode)):
                try:
                    content = unicode(str(content), 'utf-8', 'replace')
                except:
                    content = str(content)

            # 直接检测
            if self.enable_phone_detection:
                try:
                    phones = self.phone_pattern.findall(content)
                    result['phones'] = list(set(phones))[:100]  # 限制结果数量
                except Exception as e:
                    self._print_debug("Phone detection error: " + str(e))

            if self.enable_ip_detection:
                try:
                    ips = self.ip_pattern.findall(content)
                    result['ips'] = list(set(ips))[:100]
                except Exception as e:
                    self._print_debug("IP detection error: " + str(e))

            if self.enable_email_detection:
                try:
                    emails = self.email_pattern.findall(content)
                    result['emails'] = list(set(emails))[:100]
                except Exception as e:
                    self._print_debug("Email detection error: " + str(e))

            # Base64解码后检测
            if self.enable_base64_detection:
                result = self._detect_base64_content(content, result, start_time)

            # 判断是否有敏感信息
            result['has_sensitive_info'] = (
                    len(result['phones']) > 0 or
                    len(result['ips']) > 0 or
                    len(result['emails']) > 0 or
                    len(result['base64_decoded_phones']) > 0 or
                    len(result['base64_decoded_ips']) > 0 or
                    len(result['base64_decoded_emails']) > 0
            )

            result['processing_time'] = time.time() - start_time

        except Exception as e:
            self._print_error("Detection error: " + str(e))

        return result

    def _detect_base64_content(self, content, result, start_time):
        """检测Base64编码内容"""
        try:
            base64_matches = self.base64_pattern.findall(content)

            # 限制Base64匹配数量和大小
            processed_count = 0
            max_base64_items = 50  # 最多处理50个Base64字符串

            for b64_string in base64_matches:
                # 检查处理时间和数量限制
                if (time.time() - start_time > self.MAX_PROCESSING_TIME or
                        processed_count >= max_base64_items or
                        len(b64_string) > self.MAX_BASE64_SIZE):
                    break

                try:
                    decoded_bytes = base64.b64decode(b64_string)
                    if len(decoded_bytes) > self.MAX_BASE64_SIZE:
                        continue

                    if self._is_printable_content(decoded_bytes):
                        decoded_string = self._safe_decode_base64_result(decoded_bytes)
                        if decoded_string and len(decoded_string) < 10000:  # 限制解码后内容大小
                            # 在解码内容中查找敏感信息
                            if self.enable_phone_detection:
                                try:
                                    b64_phones = self.phone_pattern.findall(decoded_string)
                                    result['base64_decoded_phones'].extend(b64_phones)
                                except:
                                    pass

                            if self.enable_ip_detection:
                                try:
                                    b64_ips = self.ip_pattern.findall(decoded_string)
                                    result['base64_decoded_ips'].extend(b64_ips)
                                except:
                                    pass

                            if self.enable_email_detection:
                                try:
                                    b64_emails = self.email_pattern.findall(decoded_string)
                                    result['base64_decoded_emails'].extend(b64_emails)
                                except:
                                    pass

                    processed_count += 1
                except Exception as e:
                    self._print_debug("Base64 decode error: " + str(e))
                    continue

            # 去重并限制数量
            result['base64_decoded_phones'] = list(set(result['base64_decoded_phones']))[:50]
            result['base64_decoded_ips'] = list(set(result['base64_decoded_ips']))[:50]
            result['base64_decoded_emails'] = list(set(result['base64_decoded_emails']))[:50]

        except Exception as e:
            self._print_error("Base64 detection error: " + str(e))

        return result

    def _safe_decode_base64_result(self, byte_data):
        """安全解码Base64结果"""
        if not byte_data or len(byte_data) == 0:
            return ""

        # 限制解码大小
        if len(byte_data) > 10000:
            byte_data = byte_data[:10000]

        for encoding in self.encoding_list:
            try:
                if isinstance(byte_data, str):
                    decoded = byte_data.decode(encoding, 'replace')
                else:
                    decoded = ''.join(chr(b & 0xFF) for b in byte_data).decode(encoding, 'replace')

                # 简单验证
                if self._is_good_decode(decoded):
                    return decoded
            except Exception:
                continue

        # 最后的fallback
        try:
            if isinstance(byte_data, str):
                return byte_data.decode('utf-8', 'replace')
            else:
                return ''.join(chr(b & 0xFF) for b in byte_data).decode('utf-8', 'replace')
        except:
            try:
                return str(byte_data)
            except:
                return ""

    def _is_printable_content(self, data):
        """检查是否为可打印内容"""
        if not data or len(data) == 0:
            return False

        try:
            # 限制检查长度
            check_length = min(len(data), 1000)

            printable_count = 0
            for i in range(check_length):
                try:
                    if isinstance(data, str):
                        char_code = ord(data[i])
                    else:
                        char_code = data[i] & 0xFF

                    if 32 <= char_code <= 126 or char_code in [9, 10, 13] or char_code >= 128:
                        printable_count += 1
                except:
                    continue

            return float(printable_count) / check_length > 0.7
        except:
            return False

    def _determine_highlight_color(self, result):
        """确定高亮颜色"""
        try:
            has_phone = len(result['phones']) > 0 or len(result['base64_decoded_phones']) > 0
            has_ip = len(result['ips']) > 0 or len(result['base64_decoded_ips']) > 0
            has_email = len(result['emails']) > 0 or len(result['base64_decoded_emails']) > 0

            # 计算类型数量
            type_count = sum([has_phone, has_ip, has_email])

            if type_count > 1:
                return "yellow"  # 混合类型
            elif has_phone:
                return "red"  # 手机号
            elif has_ip:
                return "green"  # IP地址
            elif has_email:
                return "blue"  # 邮箱
            else:
                return "gray"  # 默认
        except:
            return "gray"

    def _update_results_display(self):
        """更新结果显示"""
        try:
            with self.lock:
                results_text = []
                total_count = len(self.detection_results)

                results_text.append("=" * 80)
                results_text.append("SENSITIVE INFORMATION DETECTION RESULTS")
                results_text.append("=" * 80)
                results_text.append("")

                # 显示统计信息
                results_text.append("STATISTICS:")
                results_text.append("  Total Processed: " + str(self.stats['total_processed']))
                results_text.append("  With Sensitive Info: " + str(self.stats['total_sensitive']))
                results_text.append("  Skipped (Large): " + str(self.stats['skipped_large']))
                results_text.append("  Skipped (Timeout): " + str(self.stats['skipped_timeout']))
                results_text.append("  Encoding Errors: " + str(self.stats['encoding_errors']))
                results_text.append("")
                results_text.append("-" * 80)
                results_text.append("")

                for i, (url, result) in enumerate(self.detection_results.items(), 1):
                    try:
                        results_text.append("[{}] URL: {}".format(i, url))
                        results_text.append("    Content Size: {} bytes, Processing Time: {:.3f}s".format(
                            result.get('content_size', 0), result.get('processing_time', 0)))
                        results_text.append("-" * 60)

                        # 直接发现的信息
                        if result.get('phones'):
                            results_text.append("  Phone Numbers: {}".format(", ".join(result['phones'])))
                        if result.get('ips'):
                            results_text.append("  IP Addresses: {}".format(", ".join(result['ips'])))
                        if result.get('emails'):
                            results_text.append("  Email Addresses: {}".format(", ".join(result['emails'])))

                        # Base64解码后发现的信息
                        if result.get('base64_decoded_phones'):
                            results_text.append(
                                "  Phone Numbers (Base64): {}".format(", ".join(result['base64_decoded_phones'])))
                        if result.get('base64_decoded_ips'):
                            results_text.append(
                                "  IP Addresses (Base64): {}".format(", ".join(result['base64_decoded_ips'])))
                        if result.get('base64_decoded_emails'):
                            results_text.append(
                                "  Email Addresses (Base64): {}".format(", ".join(result['base64_decoded_emails'])))

                        results_text.append("")
                    except Exception as e:
                        results_text.append("Error displaying result for URL: " + str(e))
                        results_text.append("")

                self._results_area.setText("\n".join(results_text))

                # 更新统计标签
                stats_text = "Statistics: {} processed, {} sensitive, {} large, {} timeout, {} encoding errors".format(
                    self.stats['total_processed'],
                    self.stats['total_sensitive'],
                    self.stats['skipped_large'],
                    self.stats['skipped_timeout'],
                    self.stats['encoding_errors']
                )
                self._stats_label.setText(stats_text)

        except Exception as e:
            self._print_error("Error updating results display: " + str(e))

    def createNewInstance(self, controller, editable):
        """创建新的消息编辑器标签实例"""
        return SensitiveInfoTab(self, controller, editable)

    def clear_results(self):
        """清除所有结果"""
        with self.lock:
            self.detection_results.clear()
            self.stats = {
                'total_processed': 0,
                'total_sensitive': 0,
                'skipped_large': 0,
                'skipped_timeout': 0,
                'encoding_errors': 0
            }
        self._results_area.setText("")
        self._stats_label.setText("Statistics: 0 requests processed")

    def export_results(self):
        """导出结果"""
        try:
            with self.lock:
                if not self.detection_results:
                    print("No results to export")
                    return

                # 简单的文本导出
                export_text = []
                export_text.append("Sensitive Information Detection Results")
                export_text.append("Generated at: " + str(time.time()))
                export_text.append("=" * 60)

                for url, result in self.detection_results.items():
                    export_text.append("\nURL: " + url)
                    if result.get('phones'):
                        export_text.append("Phones: " + ", ".join(result['phones']))
                    if result.get('ips'):
                        export_text.append("IPs: " + ", ".join(result['ips']))
                    if result.get('emails'):
                        export_text.append("Emails: " + ", ".join(result['emails']))
                    if result.get('base64_decoded_phones'):
                        export_text.append("Phones (B64): " + ", ".join(result['base64_decoded_phones']))
                    if result.get('base64_decoded_ips'):
                        export_text.append("IPs (B64): " + ", ".join(result['base64_decoded_ips']))
                    if result.get('base64_decoded_emails'):
                        export_text.append("Emails (B64): " + ", ".join(result['base64_decoded_emails']))

                print("\n".join(export_text))
                print("Results exported to console")

        except Exception as e:
            self._print_error("Export error: " + str(e))

    def _print_debug(self, message):
        """打印调试信息"""
        try:
            print("[DEBUG] " + str(message))
        except:
            print("[DEBUG] <message display error>")

    def _print_error(self, message):
        """打印错误信息"""
        try:
            print("[ERROR] " + str(message))
        except:
            print("[ERROR] <error message display error>")


class SensitiveInfoTab(IMessageEditorTab):
    """敏感信息显示标签"""

    def __init__(self, extender, controller, editable):
        self._extender = extender
        self._controller = controller
        self._editable = editable
        self._current_message = None

        # 创建UI组件
        self._create_ui()

    def _create_ui(self):
        """创建标签UI"""
        self._component = JPanel(BorderLayout())

        # 结果显示区域
        self._text_area = JTextArea()
        self._text_area.setEditable(False)
        self._text_area.setFont(Font("Monospaced", Font.PLAIN, 12))

        scroll_pane = JScrollPane(self._text_area)
        scroll_pane.setBorder(TitledBorder("Detected Sensitive Information"))

        self._component.add(scroll_pane, BorderLayout.CENTER)

    def getTabCaption(self):
        return "Sensitive Info"

    def getUiComponent(self):
        return self._component

    def isEnabled(self, content, isRequest):
        """只在响应消息中启用"""
        return not isRequest

    def setMessage(self, content, isRequest):
        """设置消息内容"""
        if content is None or isRequest:
            self._text_area.setText("")
            return

        try:
            # 检查内容大小
            content_size = len(content) if content else 0
            if content_size > self._extender.MAX_CONTENT_SIZE:
                self._text_area.setText("Content too large (" + str(
                    content_size) + " bytes). Skipping analysis to prevent performance issues.")
                return

            # 获取当前请求的URL
            if hasattr(self._controller, 'getHttpService'):
                service = self._controller.getHttpService()
                request = self._controller.getRequest()
                if service and request:
                    try:
                        request_info = self._extender._helpers.analyzeRequest(service, request)
                        url = str(request_info.getUrl())

                        # 查找对应的检测结果
                        with self._extender.lock:
                            if url in self._extender.detection_results:
                                result = self._extender.detection_results[url]
                                self._display_result(result, url)
                                return
                    except Exception as e:
                        self._extender._print_debug("URL extraction error: " + str(e))

            # 如果没找到结果,实时检测(带大小限制)
            self._perform_realtime_detection(content)

        except Exception as e:
            error_msg = "Error analyzing content: " + str(e)
            self._text_area.setText(error_msg)
            self._extender._print_error(error_msg)

    def _display_result(self, result, url):
        """显示检测结果"""
        try:
            lines = []
            lines.append("URL: " + str(url))
            lines.append("Content Size: " + str(result.get('content_size', 0)) + " bytes")
            lines.append("Processing Time: " + str(result.get('processing_time', 0)) + "s")
            lines.append("=" * 60)
            lines.append("")

            if not result.get('has_sensitive_info', False):
                lines.append("No sensitive information detected.")
            else:
                if result.get('phones'):
                    lines.append("[PHONE NUMBERS]:")
                    for phone in result['phones']:
                        lines.append("  -> " + str(phone))
                    lines.append("")

                if result.get('ips'):
                    lines.append("[IP ADDRESSES]:")
                    for ip in result['ips']:
                        lines.append("  -> " + str(ip))
                    lines.append("")

                if result.get('emails'):
                    lines.append("[EMAIL ADDRESSES]:")
                    for email in result['emails']:
                        lines.append("  -> " + str(email))
                    lines.append("")

                # Base64解码后的结果
                if result.get('base64_decoded_phones'):
                    lines.append("[PHONE NUMBERS (from Base64)]:")
                    for phone in result['base64_decoded_phones']:
                        lines.append("  -> " + str(phone))
                    lines.append("")

                if result.get('base64_decoded_ips'):
                    lines.append("[IP ADDRESSES (from Base64)]:")
                    for ip in result['base64_decoded_ips']:
                        lines.append("  -> " + str(ip))
                    lines.append("")

                if result.get('base64_decoded_emails'):
                    lines.append("[EMAIL ADDRESSES (from Base64)]:")
                    for email in result['base64_decoded_emails']:
                        lines.append("  -> " + str(email))
                    lines.append("")

            self._text_area.setText("\n".join(lines))

        except Exception as e:
            error_msg = "Error displaying result: " + str(e)
            self._text_area.setText(error_msg)
            self._extender._print_error(error_msg)

    def _perform_realtime_detection(self, content):
        """实时检测敏感信息"""
        try:
            start_time = time.time()

            # 检查大小限制
            if content and len(content) > self._extender.MAX_CONTENT_SIZE:
                self._text_area.setText("Content too large for real-time analysis.")
                return

            # 解码内容
            content_str = self._extender._robust_decode_bytes(content, [])
            if content_str is None:
                self._text_area.setText("Failed to decode content due to encoding issues.")
                return

            # 执行检测
            result = self._extender._detect_sensitive_info(content_str, start_time)
            if result:
                self._display_result(result, "Current Message (Real-time)")
            else:
                self._text_area.setText("Analysis timed out or failed.")

        except Exception as e:
            error_msg = "Error performing detection: " + str(e)
            self._text_area.setText(error_msg)
            self._extender._print_error(error_msg)

    def getMessage(self):
        return self._current_message

    def isModified(self):
        return False

    def getSelectedData(self):
        return None


class ClearButtonListener(ActionListener):
    """清除按钮监听器"""

    def __init__(self, extender):
        self._extender = extender

    def actionPerformed(self, event):
        self._extender.clear_results()


class ExportButtonListener(ActionListener):
    """导出按钮监听器"""

    def __init__(self, extender):
        self._extender = extender

    def actionPerformed(self, event):
        self._extender.export_results()


class ConfigChangeListener(ActionListener):
    """配置变更监听器"""

    def __init__(self, extender, config_type):
        self._extender = extender
        self._config_type = config_type

    def actionPerformed(self, event):
        self._extender.update_config(self._config_type)

只有以诚相待,才会不虚此行。

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×

给我留言