基本扫描器的开发方法

一、主机存活探测器

1.1 ICMP ping 探测

  • 原理:发送 ICMP Echo 请求(ping),如果目标回复 Echo Reply,则主机存活
  • 实现方法:直接通过调用 ping 命令探测所在的主机端口即可;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import subprocess
import platform
import ipaddress

def icmp_ping(ip):
# 使用 ping 命令进行 ICMP 探测
command = ["ping", "-c", "1", ip] if platform.system() != "Windows" else ["ping", "-n", "1", ip]
response = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return response.returncode == 0

def scan_icmp(subnet_or_ip):
try:
if '/' in subnet_or_ip:
# 输入的是子网
net = ipaddress.IPv4Network(subnet_or_ip, strict=False)
for ip in net.hosts():
if icmp_ping(str(ip)):
print(f"Host {ip} is alive (ICMP)")
else:
# 输入的是单个IP
if icmp_ping(subnet_or_ip):
print(f"Host {subnet_or_ip} is alive (ICMP)")
except ValueError:
print("Invalid subnet or IP address format.")

1.2 TCP SYN 探测

  • 原理:发送 TCP SYN 包,如果目标返回 SYN-ACK,则主机存活(类似 nmap -sS
  • 实现方法:
    • 选择一个常见开放端口(如 80、443、22)
    • 发送 TCP SYN 包,等待 SYN-ACK 响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import socket

def tcp_syn_scan(ip, port=80):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 设置超时为1秒
result = sock.connect_ex((ip, port))
sock.close()
return result == 0 # 返回是否连接成功
except socket.error:
return False

def scan_tcp_syn(subnet_or_ip, port=80):
try:
if '/' in subnet_or_ip:
net = ipaddress.IPv4Network(subnet_or_ip, strict=False)
for ip in net.hosts():
if tcp_syn_scan(str(ip), port):
print(f"Host {ip} is alive (TCP SYN on port {port})")
else:
if tcp_syn_scan(subnet_or_ip, port):
print(f"Host {subnet_or_ip} is alive (TCP SYN on port {port})")
except ValueError:
print("Invalid subnet or IP address format.")

1.3 TCP ACK 探测

  • 原理:向目标发送一个 ACK 包,如果返回 RST 包,则主机存活
  • 实现方法:
    • 选择一个常见开放端口(如 80、443、22)
    • 发送 TCP ACK 包,等待 RST 包响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def tcp_ack_scan(ip, port=80):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect_ex((ip, port)) # 连接尝试
sock.close()
return True
except socket.error:
return False

def scan_tcp_ack(subnet_or_ip, port=80):
try:
if '/' in subnet_or_ip:
net = ipaddress.IPv4Network(subnet_or_ip, strict=False)
for ip in net.hosts():
if tcp_ack_scan(str(ip), port):
print(f"Host {ip} is alive (TCP ACK on port {port})")
else:
if tcp_ack_scan(subnet_or_ip, port):
print(f"Host {subnet_or_ip} is alive (TCP ACK on port {port})")
except ValueError:
print("Invalid subnet or IP address format.")

1.4 ARP 探测(仅适用于局域网)

  • 原理:向目标发送 ARP 请求,如果有相应,则主机存活(适用于局域网)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from scapy.all import ARP, Ether, srp

def arp_ping(ip):
# 创建 ARP 请求包
arp_request = ARP(pdst=ip)
# 创建以太网帧
ether_frame = Ether(dst="ff:ff:ff:ff:ff:ff") / arp_request
# 发送请求并等待响应
result = srp(ether_frame, timeout=1, verbose=False)[0]

# 如果有响应,返回主机的 IP 和 MAC 地址
alive_hosts = []
for sent, received in result:
alive_hosts.append({"IP": received.psrc, "MAC": received.hwsrc})

return alive_hosts

def scan_network(network):
print(f"Scanning network: {network}")
hosts = arp_ping(network)

if hosts:
print("\nAlive hosts found:")
for host in hosts:
print(f"IP: {host['IP']} MAC: {host['MAC']}")
else:
print("No hosts found.")

1.5 UDP 探测

  • 原理:发送 UDP 包到目标端口,如果收到 ICMP 端口不可达(Type 3,Code 3),说明主机存活。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import socket

def udp_ping(ip, port=53):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
sock.sendto(b"UDP test", (ip, port))
response = sock.recvfrom(1024)
sock.close()
return True # 收到响应
except socket.timeout:
return False
except socket.error:
return False

def scan_udp(subnet_or_ip, port=53):
try:
if '/' in subnet_or_ip:
net = ipaddress.IPv4Network(subnet_or_ip, strict=False)
for ip in net.hosts():
if udp_ping(str(ip), port):
print(f"Host {ip} is alive (UDP on port {port})")
else:
if udp_ping(subnet_or_ip, port):
print(f"Host {subnet_or_ip} is alive (UDP on port {port})")
except ValueError:
print("Invalid subnet or IP address format.")

1.6 主机探活实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import socket
import subprocess
import platform
import ipaddress
import threading
from scapy.all import ARP, Ether, srp, IP, UDP, ICMP, sr1

# 常见的 TCP 端口列表
COMMON_TCP_PORTS = [22, 53, 80, 443, 3389, 21, 25, 110, 143, 8080]

# ICMP Ping 探测
def icmp_ping(ip):
command = ["ping", "-c", "1", ip] if platform.system() != "Windows" else ["ping", "-n", "1", ip]
response = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return response.returncode == 0

# TCP SYN 探测
def tcp_syn_scan(ip, ports=COMMON_TCP_PORTS):
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 设置超时为1秒
result = sock.connect_ex((ip, port))
sock.close()
if result == 0:
return True # 端口开放,认为主机存活
except socket.error:
continue
return False

# TCP ACK 探测
def tcp_ack_scan(ip, ports=COMMON_TCP_PORTS):
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect_ex((ip, port)) # 连接尝试
sock.close()
return True # 目标端口存在,认为主机存活
except socket.error:
continue
return False

# ARP 探测
def arp_ping(ip):
arp_request = ARP(pdst=ip)
ether_frame = Ether(dst="ff:ff:ff:ff:ff:ff") / arp_request
result = srp(ether_frame, timeout=1, verbose=False)[0]
return len(result) > 0 # 如果有响应,主机存活

# UDP 探测
def udp_ping(ip, port=53):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1) # 设置超时
sock.sendto(b"UDP test", (ip, port))
# 等待 ICMP 端口不可达消息
response = sr1(IP(dst=ip)/ICMP(), timeout=1, verbose=False)
return response is None # 如果没有返回 ICMP 端口不可达消息,说明目标存在
except socket.timeout:
return False
except socket.error:
return False

# 综合扫描方法
def scan_host(ip, methods=["icmp", "arp", "udp", "syn", "ack"]):
# 如果任意一个探测方法发现主机存活,则返回 True
if "icmp" in methods and icmp_ping(ip):
return True

if "arp" in methods and ipaddress.ip_address(ip).is_private: # 仅局域网内使用 ARP
if arp_ping(ip):
return True

if "udp" in methods and udp_ping(ip):
return True

if "syn" in methods and tcp_syn_scan(ip):
return True

if "ack" in methods and tcp_ack_scan(ip):
return True

return False

# 扫描子网
def scan_subnet(network, methods=["icmp", "syn", "ack", "arp", "udp"]):
net = ipaddress.IPv4Network(network, strict=False)
for ip in net.hosts():
threading.Thread(target=scan_and_print, args=(str(ip), methods)).start()

# 扫描单个 IP
def scan_single_ip(ip, methods=["icmp", "syn", "ack", "arp", "udp"]):
if scan_host(ip, methods):
print(f"{ip} is Alive")
else:
print(f"{ip} is Dead")

# 打印扫描结果
def print_scan_results(ip, is_alive):
if is_alive:
print(f"{ip} is Alive")

# 处理扫描并打印结果
def scan_and_print(ip, methods):
is_alive = scan_host(ip, methods)
print_scan_results(ip, is_alive)

# 主函数
def main():
target = input("Enter a subnet (e.g., 192.168.1.0/24) or IP (e.g., 192.168.1.1): ")

# 如果是单个IP扫描
if '/' not in target:
scan_single_ip(target)

# 如果是子网扫描
else:
scan_subnet(target)

if __name__ == "__main__":
main()

二、端口开放探测器

2.1 TCP 全连接扫描

  • 原理: 尝试与目标的指定端口建立 TCP 连接,成功则端口开放。
  • 实现方法:
    • 通过socket.connect_ex()创建 sock
    • 通过 sock 与指定端口建立 TCP 链接,成功则端口开放。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import socket

def tcp_connect_scan(target_ip, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1) # 设置超时时间,避免长时间等待
result = sock.connect_ex((target_ip, port))
if result == 0:
return f"端口 {port} 开放"
else:
return f"端口 {port} 关闭"

target = "192.168.1.1"
for port in range(20, 25):
print(tcp_connect_scan(target, port))

2.2 TCP 半连接扫描

  • 原理:向目标的指定端口发送 TCP SYN 包,如果目标返回 SYN-ACK,则端口开放
  • 实现方法:
    • 指定一个端口发送 TCP SYN 包
    • 等待 SYN-ACK 响应,若收到响应,则端口开放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from scapy.all import *

def syn_scan(target_ip, port):
src_port = RandShort() # 随机源端口
syn_packet = IP(dst=target_ip)/TCP(sport=src_port, dport=port, flags="S")
response = sr1(syn_packet, timeout=1, verbose=False)

if response and response.haslayer(TCP):
if response.getlayer(TCP).flags == 0x12: # SYN-ACK
return f"端口 {port} 开放"
elif response.getlayer(TCP).flags == 0x14: # RST
return f"端口 {port} 关闭"
return f"端口 {port} 过滤或无响应"

target = "192.168.1.1"
for port in range(20, 25):
print(syn_scan(target, port))

2.3 UDP 扫描

  • 原理:向目标端口发送 UDP 数据包,如果没有响应,则端口可能是开放或被过滤。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def udp_scan(target_ip, port):
udp_packet = IP(dst=target_ip)/UDP(dport=port)
response = sr1(udp_packet, timeout=2, verbose=False)

if response is None: # 没有响应,可能开放或被过滤
return f"端口 {port} 开放或被过滤"
elif response.haslayer(ICMP) and response.getlayer(ICMP).type == 3:
return f"端口 {port} 关闭"
return f"端口 {port} 开放"

target = "192.168.1.1"
for port in [53, 161, 69]:
print(udp_scan(target, port))

2.4 TCP FIN 扫描

  • 原理:向目标发送一个带有 FIN 标志的数据包,如果返回 RST 包,则相应端口关闭。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def fin_scan(target_ip, port):
fin_packet = IP(dst=target_ip)/TCP(dport=port, flags="F")
response = sr1(fin_packet, timeout=1, verbose=False)

if response is None: # 无响应,可能端口开放
return f"端口 {port} 开放或被过滤"
elif response.haslayer(TCP) and response.getlayer(TCP).flags == 0x14: # RST
return f"端口 {port} 关闭"
return f"端口 {port} 过滤"

target = "192.168.1.1"
for port in range(20, 25):
print(fin_scan(target, port))

2.5 NULL 扫描

  • 原理:发送没有任何 TCP 标志位的数据包,如果接收到 RST 包,则相应端口关闭。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def null_scan(target_ip, port):
null_packet = IP(dst=target_ip)/TCP(dport=port, flags="")
response = sr1(null_packet, timeout=1, verbose=False)

if response is None:
return f"端口 {port} 开放或被过滤"
elif response.haslayer(TCP) and response.getlayer(TCP).flags == 0x14:
return f"端口 {port} 关闭"
return f"端口 {port} 过滤"

target = "192.168.1.1"
for port in range(20, 25):
print(null_scan(target, port))

2.6 Xmas 扫描

  • 原理: 发送同时设置了 FIN、PSH、URG 标志的数据包,关闭端口返回 RST,开放端口无响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def xmas_scan(target_ip, port):
xmas_packet = IP(dst=target_ip)/TCP(dport=port, flags="FPU")
response = sr1(xmas_packet, timeout=1, verbose=False)

if response is None:
return f"端口 {port} 开放或被过滤"
elif response.haslayer(TCP) and response.getlayer(TCP).flags == 0x14:
return f"端口 {port} 关闭"
return f"端口 {port} 过滤"

target = "192.168.1.1"
for port in range(20, 25):
print(xmas_scan(target, port))

2.7 TCP ACK 扫描

  • 作用:用于判断防火墙的规则。
  • 原理: 发送一个带 ACK 标志的数据包,无论端口开放还是关闭,目标通常都会返回 RST;但如果数据包被过滤,则不会有响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def ack_scan(target_ip, port):
ack_packet = IP(dst=target_ip)/TCP(dport=port, flags="A")
response = sr1(ack_packet, timeout=1, verbose=False)

if response is None:
return f"端口 {port} 可能被防火墙过滤"
elif response.haslayer(TCP) and response.getlayer(TCP).flags == 0x14:
return f"端口 {port} 未被过滤(可达)"
return f"端口 {port} 过滤"

target = "192.168.1.1"
for port in range(20, 25):
print(ack_scan(target, port))

2.8 端口开放探测实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python3
import socket
import argparse
from scapy.all import IP, TCP, sr1, send

# 基础扫描方法实现

def tcp_connect_scan(target, port, timeout=1):
"""TCP 全连接扫描"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
try:
result = s.connect_ex((target, port))
s.close()
if result == 0:
return True
return False
except Exception:
return False

def syn_scan(target, port, timeout=1):
"""SYN 扫描"""
packet = IP(dst=target) / TCP(dport=port, flags="S")
response = sr1(packet, timeout=timeout, verbose=0)
if response is None:
return None # 无响应,可能被过滤
elif response.haslayer(TCP):
tcp_layer = response.getlayer(TCP)
# 如果收到 SYN/ACK
if tcp_layer.flags == 0x12:
# 发送 RST 断开连接
rst_pkt = IP(dst=target) / TCP(dport=port, flags="R")
send(rst_pkt, verbose=0)
return True
elif tcp_layer.flags == 0x14:
return False
return None

def fin_scan(target, port, timeout=1):
"""FIN 扫描"""
packet = IP(dst=target) / TCP(dport=port, flags="F")
response = sr1(packet, timeout=timeout, verbose=0)
if response is None:
return None # 无响应,可能开放或被过滤
elif response.haslayer(TCP) and response.getlayer(TCP).flags == 0x14:
return False
return None

# 综合扫描方法,根据多个扫描结果综合判断端口状态

def combined_scan(target, port, timeout=1):
"""
综合使用 SYN、TCP Connect、FIN 扫描来确定端口状态。
逻辑说明:
1. 首先使用 SYN 扫描:如果返回 True,说明端口开放;如果返回 False,说明关闭。
2. 如果 SYN 扫描无响应(返回 None),则尝试 TCP 全连接扫描确认。
3. 如果 TCP 全连接扫描仍然无响应,再用 FIN 扫描进一步确认。
4. 根据所有方法的结果给出最终结论。
"""
status_details = {}

# 尝试 SYN 扫描
syn_result = syn_scan(target, port, timeout)
status_details['syn'] = syn_result

if syn_result is True:
final_status = "open"
elif syn_result is False:
final_status = "closed"
else:
# SYN 无响应,尝试 TCP 全连接扫描
connect_result = tcp_connect_scan(target, port, timeout)
status_details['connect'] = connect_result
if connect_result:
final_status = "open"
else:
# 如果 TCP 全连接扫描也无响应,再尝试 FIN 扫描
fin_result = fin_scan(target, port, timeout)
status_details['fin'] = fin_result
if fin_result is False:
final_status = "closed"
else:
final_status = "open/filtered"

print(f"[Combined] 端口 {port} 最终状态: {final_status} | 细节: {status_details}")
return final_status

# 主程序:支持命令行参数,扫描指定端口范围

def main():
parser = argparse.ArgumentParser(description="综合多种扫描方法的端口探测器")
parser.add_argument("target", help="目标IP地址或域名")
parser.add_argument("-p", "--ports", help="扫描端口范围,如 20-80,默认为 20-80", default="20-80")
parser.add_argument("--timeout", type=int, help="超时时间(秒),默认为 1", default=1)
args = parser.parse_args()

# 解析端口范围
if "-" in args.ports:
start_port, end_port = map(int, args.ports.split("-"))
else:
start_port = end_port = int(args.ports)

target = args.target

print(f"开始扫描 {target} 端口 {start_port}{end_port},使用综合扫描方法")
for port in range(start_port, end_port + 1):
combined_scan(target, port, timeout=args.timeout)

if __name__ == "__main__":
main()


基本扫描器的开发方法
http://candyb0x.github.io/2025/02/20/论如何开发探测器/
作者
Candy
发布于
2025年2月20日
更新于
2025年2月24日
许可协议