Skip to content

Commit

Permalink
feat: add basic port scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
Recogerous committed Jun 3, 2024
1 parent 8b7ede1 commit 352ffb2
Show file tree
Hide file tree
Showing 9 changed files with 551 additions and 0 deletions.
1 change: 1 addition & 0 deletions package.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.\venv\Scripts\activate.bat && pyinstaller -F src/main.py -p src/
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
art~=6.2
colorama~=0.4.6
qlogging~=1.3.1
setuptools~=58.1.0
pyinstaller~=6.7.0
106 changes: 106 additions & 0 deletions scanport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import argparse
import socket
import sys
import threading
import csv
import concurrent
from concurrent.futures import ThreadPoolExecutor


def scanport(host, port, results, lock):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(8)
result = sock.connect_ex((host, port))
if result == 0:
with lock:
results[port] = {"status": "OPEN"}
except socket.error as e:
with lock:
results[port] = {"status": f"错误: {str(e)}"}


def showreport(results):
print("\n扫描报告:")
if not results.items():
print("无开放端口")
return
for port, status in sorted(results.items()):
print(f"端口 {port}: {status['status']}")


def writecsv(results, filepath="scan_results.csv"):
if not results.items():
print("无开放端口")
return
with open(filepath, 'w', newline='') as csvfile:
fieldnames = ['port', 'status']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for port, status in sorted(results.items()):
writer.writerow({'port': port, 'status': status['status']})
print(f"\n扫描结果已经存到 {filepath}")


def scanall(host, max_workers=800):
results = {}
lock = threading.Lock()

def worker(port):
print(f"扫描 {port}...")
scanport(host, port, results, lock)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(worker, port) for port in range(1, 65536)}
for future in concurrent.futures.as_completed(futures):
pass

return results


def scanports(host, ports, max_workers=100):
results = {}
lock = threading.Lock()

def worker(port):
print(f"扫描 {port}...")
scanport(host, port, results, lock)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(worker, port) for port in ports]
for future in concurrent.futures.as_completed(futures):
pass

return results


def main():
parser = argparse.ArgumentParser(description="简单的端口扫描器")
parser.add_argument("host", help="需要扫描的ip")
parser.add_argument("--all", action="store_true", help="扫描从1-65535的所有端口")
parser.add_argument("-p", "--port", metavar="PORT", nargs="+", type=int, help="指定待扫描的端口号")
parser.add_argument("--report", choices=["text", "csv"], help="指定扫描报告的格式:直接打印或保存为csv文件")
args = parser.parse_args()

host = args.host
if args.all:
print(f"扫描 {host}上所有端口...")
results = scanall(host)
elif args.port:
print(f"扫描 {host}上的指定端口...")
results = scanports(host, args.port)
else:
parser.print_help()
sys.exit(1)

# 根据用户选择输出报告
if args.report == "text" or not args.report:
showreport(results)
elif args.report == "csv":
writecsv(results)
if args.report:
print("报告已经生成")


if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!usr/bin/env python3
# -*- coding: utf-8 -*-
# Path: src/__init__.py
31 changes: 31 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!usr/bin/env python3
# -*- coding: utf-8 -*-
# Path: src/config.py
# https://thwiki.cc/%E5%A8%9C%E5%85%B9%E7%8E%B2

import qlogging
from colorama import Fore, Back, Style


TITLE = "PortNazrin"
VERSION = "0.1.0"

# log_level = "debug"
log_level = "info"
log_file = "PortNazrin.log"
log_file_mode = "w"
LOGGER = qlogging.get_logger(
level=log_level,
logfile=log_file,
logfilemode=log_file_mode,
# loggingmode='long',
loggingmode='manual',
format_str='%(color)s%(message)s%(reset)s',
colors={
"DEBUG": Fore.CYAN + Style.BRIGHT,
"INFO": Fore.GREEN + Style.BRIGHT,
"WARNING": Fore.YELLOW + Style.BRIGHT,
"ERROR": Fore.RED + Style.BRIGHT,
"CRITICAL": Fore.RED + Back.WHITE + Style.BRIGHT,
}
)
36 changes: 36 additions & 0 deletions src/i18n.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!usr/bin/env python3
# -*- coding: utf-8 -*-
# Path: src/i18n.py

# 警告与错误信息的前缀
WARN = f"[Warning] "
ERROR = f"[Error] "


MSG = {
# main.py
# 帮助内容
'description': " - 简单的端口扫描器",
'ip_help': "目标主机的IP地址。\n如“192.168.1.1”、“10.1.1.2,10.1.1.3“或“10.1.2.1-254”",
'port_help': "待扫描的端口。\n如“80”、“135,445”或“1-65535”",
'all_help': "扫描所有端口",
'workers_help': "并发扫描的最大工作线程数",
'output_help': "输出到文件",
'version_help': "显示版本信息",
'import-ip_help': "导入IP地址列表文件",
'import-port_help': "导入端口列表文件",
# main
'error_ip': f"{ERROR}请指定目标主机的IP地址,或导入IP地址列表文件",
'error_port': f"{ERROR}请指定待扫描的端口,或导入端口列表文件",
# utils
'error_ip_format': f"{ERROR}错误的IP地址:",
'error_ip_exist': f"{ERROR}存在错误的IP地址或列表文件地址,请检查输入参数。是否继续使用其他正确的IP地址?[Y/n]: ",
'error_iplist_nofound': f"{ERROR}未找到导入的IP地址列表文件:",
'error_no_ip': f"{ERROR}未找到IP地址,请检查输入的IP地址与导入的文件。程序即将退出...",
'error_port_format': f"{ERROR}错误的端口:",
'error_port_exist': f"{ERROR}存在错误的端口或列表文件地址,请检查输入参数。是否继续使用其他正确的端口?[Y/n]: ",
'error_portlist_nofound': f"{ERROR}未找到导入的端口列表文件:",
'error_no_port': f"{ERROR}未找到端口,请检查输入的端口与导入的文件。程序即将退出...",
# scanner
'finish': f"扫描完成。",
}
89 changes: 89 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!usr/bin/env python3
# -*- coding: utf-8 -*-
# Path: src/main.py

import argparse
import os
import signal
import sys

from config import VERSION, TITLE, LOGGER
from i18n import MSG
from scanner import PortScanner
from utils import ascii_banner, extract_ip, extract_port


def argparser():
"""
命令行参数解析
:return: 命令行参数
"""
parser = argparse.ArgumentParser(description=f"{TITLE}" + MSG['description'], prog=f"{TITLE}")
parser.add_argument("-i", "--ip", help=MSG['ip_help'], action="extend", nargs="+", type=str)
parser.add_argument("-p", "--port", help=MSG['port_help'], action="extend", nargs="+", type=str)
# 导入ip地址列表文件、导入端口列表文件
parser.add_argument("-I", "--import-ip", help=MSG['import-ip_help'])
parser.add_argument("-P", "--import-port", help=MSG['import-port_help'])
parser.add_argument("--all", help=MSG['all_help'], action="store_true")
parser.add_argument(
"-w", "--workers", help=MSG['workers_help'],
type=int, default=1, choices=range(1, 1000), metavar="[1-1000]"
)
# TODO: 导出结果
# parser.add_argument("-o", "--output", help=MSG['output_help'])
# TODO: 支持协议
# parser.add_argument("-t", "--tcp", help="TCP协议", action="store_true")
# parser.add_argument("-u", "--udp", help="UDP协议", action="store_true")
# TODO: 扫描速度控制
# parser.add_argument("-s", "--speed", help="扫描速度", type=int, default=1, choices=range(1, 6))
parser.add_argument("-v", "--version", help=MSG['version_help'], action="store_true")
args = parser.parse_args()
LOGGER.debug(f"\n{args}\n")
return args


def main():
ascii_banner()
args = argparser()

# 版本信息
if args.version:
print(f"{TITLE} - {VERSION}")
sys.exit(0)

# 检查IP地址的参数值是否为空
if not args.ip and not args.import_ip:
LOGGER.error(MSG['error_ip'])
sys.exit(1)
# 提取IP地址
ip_list = extract_ip(args.ip, args.import_ip)
# 去重,排序
ip_list = list(set(ip_list))

if not args.all:
# 检查端口的参数值是否为空
if not args.port and not args.import_port:
LOGGER.error(MSG['error_port'])
sys.exit(1)
port_list = extract_port(args.port, args.import_port)
port_list = list(set(port_list))
port_list = sorted(port_list, key=lambda x: int(x))
else:
port_list = [str(i) for i in range(1, 65536)]

LOGGER.debug(f"\nip_list: {ip_list}\nport_list: {port_list}")

# 创建端口扫描器并运行
scanner = PortScanner(ip_list, port_list, args.workers)
scanner.run()


if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterrupted")
try:
sys.exit(0)
except SystemExit:
os.kill(os.getpid(), signal.SIGTERM)
65 changes: 65 additions & 0 deletions src/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!usr/bin/env python3
# -*- coding: utf-8 -*-
# Path: src/scanner.py

import socket
import threading
from queue import Queue

from config import LOGGER
from i18n import MSG


class PortScanner:
def __init__(self, ip_list, port_list, workers):
self.ip_list = ip_list
self.port_list = port_list
self.workers = workers
self.queue = Queue()
self.lock = threading.Lock()

def scan(self, ip, port):
"""
扫描指定IP和端口
:param ip: 目标IP地址
:param port: 目标端口
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, int(port)))
with self.lock:
if result == 0:
LOGGER.info(f"IP:{ip},端口:{port} 开放")
else:
LOGGER.info(f"IP:{ip},端口:{port} 关闭")
sock.close()
except Exception as e:
LOGGER.error(f"扫描IP:{ip}、端口:{port} 时出错:\n{e}")

def worker(self):
"""扫描工作线程,从队列中获取IP和端口并扫描"""
while not self.queue.empty():
ip, port = self.queue.get()
self.scan(ip, port)
self.queue.task_done()

def run(self):
"""运行端口扫描"""
# 将所有IP和端口组合添加到队列中
for ip in self.ip_list:
for port in self.port_list:
self.queue.put((ip, port))

# 开启工作线程
threads = []
for _ in range(self.workers):
thread = threading.Thread(target=self.worker)
thread.start()
threads.append(thread)

self.queue.join()
for thread in threads:
thread.join()

LOGGER.info(MSG['finish'])
Loading

0 comments on commit 352ffb2

Please sign in to comment.