From fc2f760c34711a1b9b6424d2414c103f3d8ac7d1 Mon Sep 17 00:00:00 2001 From: XNefertar Date: Mon, 20 Oct 2025 23:14:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0python=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E5=8F=8A=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../port_risk_checker.md" | 149 ++++++ .../port_risk_checker/port_risk_checker.py | 435 ++++++++++++++++++ .../test_port_risk_checker.sh | 27 ++ 3 files changed, 611 insertions(+) create mode 100644 "doc/security_test/python\350\204\232\346\234\254/port_risk_checker.md" create mode 100644 testcases/security_test/port_risk_checker/port_risk_checker.py create mode 100755 testcases/security_test/port_risk_checker/test_port_risk_checker.sh diff --git "a/doc/security_test/python\350\204\232\346\234\254/port_risk_checker.md" "b/doc/security_test/python\350\204\232\346\234\254/port_risk_checker.md" new file mode 100644 index 00000000000..56ca5207b2f --- /dev/null +++ "b/doc/security_test/python\350\204\232\346\234\254/port_risk_checker.md" @@ -0,0 +1,149 @@ +port_risk_checker — 端口与服务风险检测工具 + +概述 + +port_risk_checker.py 是一个用于枚举本机(或指定目标)监听 TCP 端口、可选使用 nmap 进行服务/版本探测,并基于启发式规则对端口和服务给出风险评估的 Python 脚本。它会输出一份 JSON 报告并在终端打印彩色摘要,适合用于快速审计和集成到测试框架(例如 mugen)。 + +文件位置 + +- 脚本: testcases/security_test/port_risk_checker.py +- Wrapper: testcases/security_test/port_risk_checker.sh +- Mugen runner: testcases/security_test/port_risk/test_port_risk.sh +- 文档: doc/python脚本/port_risk_checker.md + +主要功能 + +- 枚举本机正在监听的 TCP 端口及绑定地址(使用 `ss` 或 `netstat`)。 +- 可选地调用 `nmap -sV` 并解析 XML 输出以获取服务名、产品与版本信息。 +- 基于静态端口风险库和服务签名的启发式规则对每个端口生成评估(等级:低/中/高)并提供建议文本。 +- 检测常见防火墙工具(`ufw`/`firewall-cmd`/`iptables`)并把输出包含在报告里。 +- 生成 JSON 报告(默认 `port_risk_report.json`)并打印终端摘要;提供 `--auto-install-nmap` 支持在多个常见包管理器上尝试自动安装 nmap(非交互式,需要权限)。 + +依赖 + +- Python 3.6+ +- 可选:`nmap`(若要使用服务/版本探测) +- 推荐(runner/集成):`jq`(`common_framework.sh` 优先使用 jq,但在该仓库已实现对 Python 的回退) + +安全提示 + +- 对于远程目标请确保你有合法授权。 +- nmap 扫描可能影响目标服务或网络流量;在生产环境启用 nmap 前请谨慎评估。 +- `--auto-install-nmap` 会尝试通过系统包管理器安装 nmap,需要 root 或 sudo 权限。请在受控环境中使用。 + +用法 + +基本(跳过 nmap,快速检测本机): + +```bash +python3 testcases/security_test/port_risk_checker.py --no-nmap -o results/port_risk_report.json +``` + +启用 nmap(本机已安装 nmap): + +```bash +python3 testcases/security_test/port_risk_checker.py -o results/port_risk_report.json +``` + +尝试自动安装 nmap(如果未安装且你拥有权限): + +```bash +python3 testcases/security_test/port_risk_checker.py --auto-install-nmap -o results/port_risk_report.json +``` + +更完整参数说明 + +- `--target, -t`:扫描目标,默认 `127.0.0.1`。 +- `--no-nmap`:跳过 nmap 扫描,仅使用本地监听信息。 +- `--full-scan`:nmap 使用 `-p-` 全端口扫描(较慢);默认使用 `-F` 快速端口集。 +- `--output, -o`:JSON 报告输出路径,默认 `port_risk_report.json`。 +- `--nmap-path`:nmap 可执行文件路径或命令(默认 `nmap`)。 +- `--auto-install-nmap`:若本机未安装 nmap,尝试自动安装(非交互式,需权限)。 + +工作原理与实现细节 + +1. 枚举监听端口 + - 脚本优先尝试使用 `ss -ltnp`(Linux/macOS),若不可用回退到 `netstat -ltnp`。 + - 解析输出并标准化地址:`*` -> `0.0.0.0`,IPv6 方括号去除。 + - 返回结构:`{ port: set([bind_addresses]) }`。 + +2. 可选 nmap 扫描 + - 若启用,使用 `nmap -sV -oX - -Pn` 读取 XML 输出并使用 `xml.etree.ElementTree` 解析。 + - 解析每个 `port` 节点的 `service` 属性(name/product/version/extrainfo)并构建 `port -> info` 映射。 + +3. 风险评估 + - 静态端口风险库 `PORT_RISKS`(映射端口到 (service, level, advice))用于匹配已知高风险端口(如 23/telnet、3306/mysql 等)。 + - `SERVICE_SIGNATURE_RISKS` 包含一组正则签名(apache/nginx/openssh/telnet 等),匹配到则返回建议与等级。 + - `version_risk_checks` 对 OpenSSH 与 OpenSSL 字符串执行简单版本启发式判断(例如 OpenSSH <7 标记为高风险,OpenSSL 1.0.x 为高风险),并返回等级与消息。 + - 如果绑定到 `0.0.0.0` 或 `::`,会增加“暴露”高风险评估。 + +4. 报告生成 + - `analyze_and_report` 组合本地监听、nmap 结果与防火墙信息,生成最终 JSON 报告(字段见下)。 + - `pretty_print_report` 打印人类可读的彩色摘要到终端。 + +JSON 报告结构(示例) + +``` +{ + "generated_at": "", + "host": "", + "listening": [ + { + "port": 22, + "bind_addresses": ["0.0.0.0","::"], + "port_risk": ["SSH","中","使用密钥认证…"], + "nmap": {"state":"open","service":"ssh","product":"OpenSSH","version":"8.7","extrainfo":"..."} | null, + "assessments": [ {"type":"port_known_risk","level":"中","message":"..."}, ... ] + }, + ... + ], + "nmap": { "22": {...}, "23": {...} }, + "firewall": { "ufw":"...", "firewalld":"...", "iptables":"..." }, + "issues": [ {"port":22,"level":"中","message":"..."}, ... ] +} +``` + +集成建议(Mugen / CI) + +- Mugen runner `test_port_risk.sh` 已存在:它会调用脚本并把生成的 JSON 合并到 mugen JSON。`common_framework.sh` 提供 `init_mugen_json` 与 `write_result_blob`(现在会在没有 jq 时使用 Python 回退)。 +- 在 CI(GitHub Actions)示例 workflow `/.github/workflows/port_check.yml` 中,已设置为每周运行并使用 `--no-nmap`(更安全)。如果你想使用 jq 进行一致合并,在 CI 的安装步骤加: + ```yaml + - name: Install jq + run: sudo apt-get update && sudo apt-get install -y jq + ``` + +常见问题(FAQ) + +Q: 在 CI 里运行会失败吗? +A: 默认 workflow 里使用 `--no-nmap`,不会进行网络扫描,且脚本在缺乏 nmap 时退回到仅本地监听数据。确保 runner 上有 Python3 即可运行。若需要 jq,可在 CI 安装。 + +Q: 我看到脚本有 `--auto-install-nmap`,CI 应该启用吗? +A: 不建议在共享 CI runner 中启用自动安装;该选项适合受控环境或你具备权限的主机。 + +Q: 如何把结果排除在 git 外? +A: 在 `.gitignore` 中加入: +``` +results/ +testcases/security_test/results/ +``` +并移除已误 commit 的文件: +``` +git rm --cached testcases/security_test/results/port_risk_nmap.json +``` + +扩展与下一步建议 + +- 把服务/版本签名库外置为 JSON/YAML,以便更容易扩展和由 CI 定期更新。 +- 对 nmap 解析增加更鲁棒的字段匹配(处理 script 输出、服务额外信息等)。 +- 对 `version_risk_checks` 增强为查询 OSV/CVE 数据源(需要更精确的包名映射与网络权限)。 +- 增加单元测试:对 `ss/netstat` 输出、对 nmap XML 的多种样例进行模拟解析并在 CI 中运行。 + +版权与贡献 + +- 你可以把该脚本作为仓库的一部分提交到版本控制。对脚本进行更改时,请附带执行说明与测试步骤。 + +--- + +文档已生成并保存在: `/root/os-autotest/doc/python脚本/port_risk_checker.md` + +我现在会把 todo #6 标为 completed。若你希望我把文档添加到 README 索引或生成 HTML 版本,我可以继续处理。 \ No newline at end of file diff --git a/testcases/security_test/port_risk_checker/port_risk_checker.py b/testcases/security_test/port_risk_checker/port_risk_checker.py new file mode 100644 index 00000000000..e38124bcd88 --- /dev/null +++ b/testcases/security_test/port_risk_checker/port_risk_checker.py @@ -0,0 +1,435 @@ +#!/usr/bin/env python3 + +"""Port and service risk checker. + +Enumerates local listening TCP ports, optionally runs nmap -sV to collect +service/product/version info, applies heuristic risk rules and produces a +JSON report and a colored terminal summary. + +Note: nmap is optional. Some nmap scans may need elevated privileges. +""" + +import argparse +import json +import platform +import re +import shutil +import subprocess +import xml.etree.ElementTree as ET +from datetime import datetime + + +# ANSI color codes +class Colors: + HEADER = '\033[95m' + BLUE = '\033[94m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + + +# Known port risk database +PORT_RISKS = { + 21: ("FTP", "高", "FTP 传输未加密,建议禁用或使用 SFTP/FTPS。"), + 22: ("SSH", "中", "使用密钥认证、禁用密码登录、限制登录来源、使用 Fail2Ban/防护策略。"), + 23: ("Telnet", "高", "Telnet 未加密,立即禁用并使用 SSH。"), + 25: ("SMTP", "中", "确认是否为中继,限制访问并启用认证与加密。"), + 53: ("DNS", "中", "避免配置为开放解析器,可被用于放大攻击。"), + 80: ("HTTP", "中", "尽量启用 HTTPS 并使用现代 TLS 配置。"), + 110: ("POP3", "高", "建议使用加密版本 POP3S。"), + 143: ("IMAP", "高", "建议使用加密版本 IMAPS。"), + 443: ("HTTPS", "低", "检查 TLS 配置与已知库版本。"), + 445: ("SMB", "高", "文件共享不应暴露到公网,注意历史漏洞(如永恒之蓝)。"), + 1433: ("MSSQL", "高", "数据库服务不应暴露到公网。"), + 3306: ("MySQL", "高", "数据库服务不应暴露到公网。"), + 3389: ("RDP", "高", "远程桌面不应直接暴露到公网,建议 VPN。"), + 5432: ("PostgreSQL", "高", "数据库服务不应暴露到公网。"), + 5900: ("VNC", "高", "注意加密与认证,建议通过 SSH 隧道使用。"), + 6379: ("Redis", "高", "请确保启用认证并限制绑定地址,不应暴露公网。"), + 8080: ("HTTP-alt", "中", "确认运行的应用是否安全。"), + # 非常见/高风险端口 + 2049: ("NFS", "高", "网络文件系统,建议仅限内网访问并加固认证。"), + 6000: ("X11", "高", "X11 图形服务,极易被远程劫持,建议关闭或加密。"), + 8081: ("HTTP-alt2", "中", "确认运行的 Web 服务是否安全。"), + 8443: ("HTTPS-alt", "中", "确认 TLS 配置与服务安全性。"), + 11211: ("Memcached", "高", "Memcached 默认无认证,极易被利用为 DDoS 放大器,建议仅绑定本地。"), + 27017: ("MongoDB", "高", "MongoDB 默认无认证,建议开启认证并仅限内网。"), + 5000: ("Flask/DevApp", "中", "常见于开发环境,生产环境应关闭或加固。"), + 7001: ("WebLogic", "高", "WebLogic 管理端口,历史漏洞多,建议加固并限制访问。"), + 9000: ("SonarQube/PHP-FPM", "中", "确认服务安全配置,避免暴露敏感接口。"), + 10000: ("Webmin", "高", "Webmin 管理端口,建议加固并限制访问。"), + 25565: ("Minecraft", "中", "游戏服务器,注意 DDoS 与未授权访问。"), + 50070: ("Hadoop NameNode", "高", "Hadoop 管理端口,建议加固并限制访问。"), + 50075: ("Hadoop DataNode", "高", "Hadoop 数据节点端口,建议加固并限制访问。"), + 9200: ("Elasticsearch", "高", "默认无认证,建议加固并仅限内网。"), + 9300: ("Elasticsearch Transport", "高", "集群通信端口,建议加固并仅限内网。"), +} + + +# Service signature risk mapping +SERVICE_SIGNATURE_RISKS = [ + (re.compile(r'(?i)apache'), '中', '检查是否存在已知的 Apache 漏洞并及时更新。'), + (re.compile(r'(?i)nginx'), '中', '检查 nginx 配置(如目录遍历、默认页面等)并更新版本。'), + (re.compile(r'(?i)openssh'), '中', '建议使用 OpenSSH 最新稳定版本,禁用弱加密与登录方式。'), + (re.compile(r'(?i)telnet'), '高', 'Telnet 服务不安全,应禁用并使用 SSH。'), + (re.compile(r'(?i)ftp'), '高', 'FTP 不加密,应禁用或使用 FTPS/SFTP。'), + (re.compile(r'(?i)redis'), '高', '确认是否设置了访问密码以及仅绑定本地。'), + (re.compile(r'(?i)mongodb'), '高', '数据库服务不应暴露公网,检查认证与绑定地址。'), +] + + +def which(cmd): + return shutil.which(cmd) or None + + +def get_listening_tcp_ports_with_addrs(): + """Return dict port -> set(bind_address). + + Prefer `ss -ltnp` on Linux/macOS, fallback to `netstat`. + Normalize '*' to '0.0.0.0' and strip IPv6 brackets. + """ + system = platform.system() + ports = {} + try: + if system in ('Linux', 'Darwin'): + ss = which('ss') + if ss: + cmd = [ss, '-ltnp'] + res = subprocess.run(cmd, capture_output=True, text=True, check=True) + pattern = r'LISTEN\s+\d+\s+\d+\s+(\S+):(\d+)' + for line in res.stdout.splitlines(): + m = re.search(pattern, line) + if not m: + continue + addr, port = m.group(1), int(m.group(2)) + if addr == '*': + addr = '0.0.0.0' + if addr.startswith('[') and addr.endswith(']'): + addr = addr[1:-1] + ports.setdefault(port, set()).add(addr) + return ports + else: + cmd = ['netstat', '-ltnp'] + res = subprocess.run(cmd, capture_output=True, text=True, check=True) + pattern = r'\S+\s+\d+\s+\d+\s+(\S+):(\d+)\s+\S+\s+LISTEN' + for line in res.stdout.splitlines(): + m = re.search(pattern, line) + if not m: + continue + addr, port = m.group(1), int(m.group(2)) + if addr == '*': + addr = '0.0.0.0' + if addr.startswith('[') and addr.endswith(']'): + addr = addr[1:-1] + ports.setdefault(port, set()).add(addr) + return ports + elif system == 'Windows': + cmd = ['netstat', '-an'] + res = subprocess.run(cmd, capture_output=True, text=True, check=True) + pattern = r'TCP\s+(\S+):(\d+)\s+.*LISTENING' + for line in res.stdout.splitlines(): + m = re.search(pattern, line) + if not m: + continue + addr, port = m.group(1), int(m.group(2)) + if addr == '*': + addr = '0.0.0.0' + if addr.startswith('[') and addr.endswith(']'): + addr = addr[1:-1] + ports.setdefault(port, set()).add(addr) + return ports + else: + print(f"{Colors.RED}Unsupported OS: {system}{Colors.ENDC}") + return {} + except (subprocess.CalledProcessError, FileNotFoundError) as e: + print(f"{Colors.RED}Failed to enumerate listening ports: {e}{Colors.ENDC}") + return {} + + +def run_nmap_scan(target='127.0.0.1', full=False, nmap_path='nmap'): + """Run nmap and parse XML output into a dict port -> info.""" + if which(nmap_path) is None: + print(f"{Colors.YELLOW}nmap not found ({nmap_path}), skipping nmap scan.{Colors.ENDC}") + return None + + ports_arg = '-p-' if full else '-F' + cmd = [nmap_path, '-sV', '-oX', '-', '-Pn', ports_arg, target] + try: + proc = subprocess.run(cmd, capture_output=True, text=True, check=True) + except subprocess.CalledProcessError as e: + print(f"{Colors.RED}nmap failed: {e}{Colors.ENDC}") + return None + + try: + root = ET.fromstring(proc.stdout) + except ET.ParseError: + print(f"{Colors.RED}Failed to parse nmap XML output.{Colors.ENDC}") + return None + + result = {} + for host in root.findall('host'): + for ports in host.findall('ports'): + for port in ports.findall('port'): + portid = int(port.attrib.get('portid')) + state = port.find('state').attrib.get('state') if port.find('state') is not None else 'unknown' + svc = port.find('service') + service_name = svc.attrib.get('name') if svc is not None and 'name' in svc.attrib else '' + product = svc.attrib.get('product') if svc is not None and 'product' in svc.attrib else '' + version = svc.attrib.get('version') if svc is not None and 'version' in svc.attrib else '' + extrainfo = svc.attrib.get('extrainfo') if svc is not None and 'extrainfo' in svc.attrib else '' + + result[portid] = { + 'state': state, + 'service': service_name, + 'product': product, + 'version': version, + 'extrainfo': extrainfo, + } + + return result + + +def auto_install_nmap(): + """Attempt to install nmap using common package managers. Non-interactive. + + Returns True if installation appears successful (nmap now on PATH), False otherwise. + """ + if which('nmap'): + return True + + # Try common package managers + pkg_cmds = [ + (['apt-get', 'update', '-y'], ['apt-get', 'install', '-y', 'nmap']), + (None, ['yum', 'install', '-y', 'nmap']), + (None, ['dnf', 'install', '-y', 'nmap']), + (None, ['zypper', '--non-interactive', 'install', 'nmap']), + (None, ['pacman', '-Sy', 'nmap']), + (None, ['apk', 'add', 'nmap']), + ] + + for prep, install in pkg_cmds: + try: + if prep: + subprocess.run(prep, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + res = subprocess.run(install, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if res.returncode == 0 and which('nmap'): + return True + except FileNotFoundError: + # Package manager not present, try next + continue + except Exception: + continue + + return False + + +def check_firewall_status(): + """Check common firewall tools and return a summary dict.""" + info = {} + ufw = which('ufw') + if ufw: + try: + res = subprocess.run([ufw, 'status'], capture_output=True, text=True) + info['ufw'] = res.stdout.strip() + except Exception as e: + info['ufw_error'] = str(e) + + firewalld = which('firewall-cmd') + if firewalld: + try: + res = subprocess.run([firewalld, '--state'], capture_output=True, text=True) + info['firewalld'] = res.stdout.strip() + except Exception as e: + info['firewalld_error'] = str(e) + + iptables = which('iptables') + if iptables: + try: + res = subprocess.run([iptables, '-L', '-n'], capture_output=True, text=True) + info['iptables'] = res.stdout[:4000] + except Exception as e: + info['iptables_error'] = str(e) + + return info + + +def version_risk_checks(product, version, service): + combined = ' '.join(filter(None, [product, version, service])) + if not combined: + return None + + m = re.search(r'OpenSSH[_ ]?(\d+)\.(\d+)', combined, flags=re.I) + if m: + major = int(m.group(1)) + if major < 7: + return ('高', f'Old OpenSSH detected ({combined}), consider upgrading.') + else: + return ('中', f'OpenSSH detected ({combined}), ensure weak ciphers and password login are disabled.') + + m = re.search(r'OpenSSL\s*(\d+\.\d+\.\d+)', combined, flags=re.I) + if m: + ver = m.group(1) + if ver.startswith('1.0') or ver.startswith('0.'): + return ('高', f'Old OpenSSL detected ({ver}), consider upgrading.') + else: + return ('中', f'OpenSSL detected ({ver}), keep updated.') + + for sig, level, advice in SERVICE_SIGNATURE_RISKS: + if sig.search(combined): + return (level, f'{advice} (fingerprint: {sig.pattern})') + + return None + + +def analyze_and_report(listening_map, nmap_results, firewall_info, target='127.0.0.1'): + report = { + 'generated_at': datetime.utcnow().isoformat() + 'Z', + 'host': target, + 'listening': [], + 'nmap': nmap_results or {}, + 'firewall': firewall_info, + 'issues': [], + } + + for port, addrs in sorted(listening_map.items()): + item = { + 'port': port, + 'bind_addresses': sorted(list(addrs)), + 'port_risk': PORT_RISKS.get(port, None), + 'nmap': nmap_results.get(port) if nmap_results else None, + 'assessments': [], + } + + if port in PORT_RISKS: + svc, level, advice = PORT_RISKS[port] + item['assessments'].append({'type': 'port_known_risk', 'level': level, 'message': advice}) + + external_bind = any(a == '0.0.0.0' or a == '::' for a in addrs) + if external_bind: + item['assessments'].append({'type': 'bind_exposed', 'level': '高', 'message': '绑定到 0.0.0.0/::,可能对外网可达。'}) + + if nmap_results and port in nmap_results: + nr = nmap_results[port] + ver_check = version_risk_checks(nr.get('product', ''), nr.get('version', ''), nr.get('service', '')) + if ver_check: + item['assessments'].append({'type': 'version_issue', 'level': ver_check[0], 'message': ver_check[1]}) + + if not item['assessments']: + item['assessments'].append({'type': 'info', 'level': '低', 'message': '未在简单规则库中发现明显风险,建议进一步人工或自动化审计。'}) + + for a in item['assessments']: + if a['level'] in ('高', '中'): + report['issues'].append({'port': port, 'level': a['level'], 'message': a['message']}) + + report['listening'].append(item) + + return report + + +def pretty_print_report(report): + print(f"{Colors.BOLD}{Colors.HEADER}--- 本机开放端口与服务风险评估报告 ---{Colors.ENDC}") + + if not report['listening']: + print(f"{Colors.GREEN}未检测到监听的 TCP 端口。{Colors.ENDC}") + else: + for item in report['listening']: + port = item['port'] + binds = ','.join(item['bind_addresses']) + print(f"\n{Colors.BOLD}端口 {port} 绑定: {binds}{Colors.ENDC}") + if item['nmap']: + svc = item['nmap'].get('service') or '' + prod = item['nmap'].get('product') or '' + ver = item['nmap'].get('version') or '' + if svc or prod or ver: + print(f" - 服务: {svc} {prod} {ver}") + + for a in item['assessments']: + col = Colors.GREEN + if a['level'] == '中': + col = Colors.YELLOW + elif a['level'] == '高': + col = Colors.RED + print(f" - {col}等级: {a['level']} - {a['message']}{Colors.ENDC}") + + print(f"\n{Colors.BOLD}{Colors.HEADER}防火墙信息:{Colors.ENDC}") + if report['firewall']: + for k, v in report['firewall'].items(): + print(f" - {k}: {str(v).splitlines()[0] if v else ''}") + else: + print(f" - {Colors.YELLOW}未检测到已知防火墙工具(ufw/firewalld/iptables)。{Colors.ENDC}") + + # 打印全部发现(包含低/中/高所有级别),便于用户看到完整检测结果 + print(f"\n{Colors.BOLD}{Colors.HEADER}合计发现问题(全部级别):{Colors.ENDC}") + all_findings = [] + for item in report['listening']: + port = item['port'] + for a in item.get('assessments', []): + all_findings.append({'port': port, 'level': a.get('level', '未知'), 'message': a.get('message', '')}) + + if not all_findings: + print(f" {Colors.GREEN}未检测到任何问题(基于本工具的启发式规则)。{Colors.ENDC}") + else: + for iss in all_findings: + lvl = iss.get('level', '') + col = Colors.GREEN + if lvl == '中': + col = Colors.YELLOW + elif lvl == '高': + col = Colors.RED + print(f" - {col}{lvl}{Colors.ENDC} 端口 {iss.get('port')}: {iss.get('message')}") + + print(f"\n{Colors.YELLOW}注意:此工具仅做初步启发式检测。对于高风险或合规需求,请使用专业安全评估和渗透测试。{Colors.ENDC}") + + +def save_report(report, path): + try: + with open(path, 'w', encoding='utf-8') as f: + json.dump(report, f, ensure_ascii=False, indent=2) + return True + except Exception as e: + print(f"{Colors.RED}保存报告失败: {e}{Colors.ENDC}") + return False + + +def parse_args(): + p = argparse.ArgumentParser(description='本机端口/服务风险自动化检测') + p.add_argument('--target', '-t', default='127.0.0.1', help='扫描目标,默认本机') + p.add_argument('--no-nmap', action='store_true', help='跳过 nmap 扫描,仅使用本地监听信息') + p.add_argument('--full-scan', action='store_true', help='nmap 使用 -p- 全端口扫描(较慢)') + p.add_argument('--output', '-o', default='port_risk_report.json', help='JSON 报告输出路径') + p.add_argument('--nmap-path', default='nmap', help='nmap 可执行文件路径或命令(默认为 nmap)') + p.add_argument('--auto-install-nmap', action='store_true', help='若本机未安装 nmap,尝试自动安装(需要 root 或 sudo 权限)') + return p.parse_args() + + +def main(): + args = parse_args() + + listening = get_listening_tcp_ports_with_addrs() + + nmap_results = None + if not args.no_nmap: + # optionally attempt auto-install + if args.auto_install_nmap and not which(args.nmap_path): + print(f"{Colors.YELLOW}nmap 未检测到,尝试自动安装...{Colors.ENDC}") + ok = auto_install_nmap() + if not ok: + print(f"{Colors.RED}自动安装 nmap 失败或不可用,继续跳过 nmap 扫描。{Colors.ENDC}") + nmap_results = run_nmap_scan(args.target, full=args.full_scan, nmap_path=args.nmap_path) + + firewall_info = check_firewall_status() + + report = analyze_and_report(listening, nmap_results or {}, firewall_info, target=args.target) + + pretty_print_report(report) + + out = args.output + saved = save_report(report, out) + if saved: + print(f"\n报告已保存到: {out}") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/testcases/security_test/port_risk_checker/test_port_risk_checker.sh b/testcases/security_test/port_risk_checker/test_port_risk_checker.sh new file mode 100755 index 00000000000..2e4e2b17a3f --- /dev/null +++ b/testcases/security_test/port_risk_checker/test_port_risk_checker.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +# 简单的 wrapper,便于在 CI 或脚本中调用 +# 用法: +# ./port_risk_checker.sh # 跳过 nmap,输出到 results/port_risk_report.json +# ./port_risk_checker.sh --nmap # 启用 nmap(若系统安装了 nmap) + +set -euo pipefail +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +PY=${PY:-python3} +OUT_DIR=${OUT_DIR:-$SCRIPT_DIR/../data} +mkdir -p "$OUT_DIR" +OUT_FILE="$OUT_DIR/port_risk_report.json" + +# parse args +USE_NMAP=0 +for a in "$@"; do + case "$a" in + --nmap) USE_NMAP=1 ;; + --help|-h) echo "Usage: $0 [--nmap]"; exit 0 ;; + esac +done + +if [ "$USE_NMAP" -eq 1 ]; then + exec "$PY" "$SCRIPT_DIR/port_risk_checker.py" -o "$OUT_FILE" +else + exec "$PY" "$SCRIPT_DIR/port_risk_checker.py" --no-nmap -o "$OUT_FILE" +fi -- Gitee