-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpacket_sniffer.py
160 lines (129 loc) · 5.99 KB
/
packet_sniffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python3
# https://github.com/HectorTa1989/HecPy3-Packet-Sniffer
__author__ = 'Hector Ta @ https://github.com/HectorTa1989'
import abc
import argparse
import time
from itertools import count
from socket import ntohs, socket, PF_PACKET, SOCK_RAW
import protocols
i = ' ' * 4 # Basic indentation level
class PacketSniffer(object):
def __init__(self, interface: str):
self.interface = interface
self.data = None
self.protocol_queue = ['Ethernet']
self.__observers = list()
def register(self, observer):
self.__observers.append(observer)
def __notify_all(self, *args):
for observer in self.__observers:
observer.update(*args)
del self.protocol_queue[1:]
def execute(self):
with socket(PF_PACKET, SOCK_RAW, ntohs(0x0003)) as sock:
if self.interface is not None:
sock.bind((self.interface, 0))
for self.packet_num in count(1):
raw_packet = sock.recv(9000)
start: int = 0
for proto in self.protocol_queue:
proto_class = getattr(protocols, proto)
end: int = start + proto_class.header_len
protocol = proto_class(raw_packet[start:end])
setattr(self, proto.lower(), protocol)
if protocol.encapsulated_proto is None:
break
self.protocol_queue.append(protocol.encapsulated_proto)
start = end
self.data = raw_packet[end:]
self.__notify_all(self)
class OutputMethod(abc.ABC):
"""Interface for the implementation of all classes responsible for
further processing and/or output of the information gathered by
the PacketSniffer class (referenced as 'subject')."""
def __init__(self, subject):
subject.register(self)
@abc.abstractmethod
def update(self, *args, **kwargs):
pass
class SniffToScreen(OutputMethod):
def __init__(self, subject, *, displaydata: bool):
super().__init__(subject)
self.p = None
self.display_data = displaydata
def update(self, packet):
self.p = packet
self._display_output_header()
self._display_packet_info()
self._display_packet_contents()
def _display_output_header(self):
local_time = time.strftime('%H:%M:%S', time.localtime())
print('[>] Packet #{0} at {1}:'.format(self.p.packet_num, local_time))
def _display_packet_info(self):
for proto in self.p.protocol_queue:
getattr(self, '_display_{}_data'.format(proto.lower()))()
def _display_ethernet_data(self):
print('{0}[+] MAC {1:.>23} -> {2}'.format(i, self.p.ethernet.source,
self.p.ethernet.dest))
def _display_ipv4_data(self):
print('{0}[+] IPv4 {1:.>22} -> {2: <15} | '
'PROTO: {3} TTL: {4}'.format(i, self.p.ipv4.source,
self.p.ipv4.dest,
self.p.ipv4.encapsulated_proto,
self.p.ipv4.ttl))
def _display_ipv6_data(self):
print('{0}[+] IPv6 {1:.>22} -> {2: <15}'.format(i, self.p.ipv6.source,
self.p.ipv6.dest))
def _display_arp_data(self):
if self.p.arp.oper == 1: # ARP Request
print('{0}[+] ARP Who has {1: >13} ? '
'-> Tell {2}'.format(i, self.p.arp.target_proto,
self.p.arp.source_proto))
else: # ARP Reply
print('{0}[+] ARP {1:.>23} -> '
'Is at {2}'.format(i, self.p.arp.source_proto,
self.p.arp.source_hdwr))
def _display_tcp_data(self):
print('{0}[+] TCP {1:.>23} -> {2: <15} | '
'Flags: {3} > {4}'.format(i, self.p.tcp.sport,
self.p.tcp.dport,
self.p.tcp.flag_hex,
self.p.tcp.flag_txt))
def _display_udp_data(self):
print('{0}[+] UDP {1:.>23} -> {2}'.format(i, self.p.udp.sport,
self.p.udp.dport))
def _display_icmp_data(self):
print('{0}[+] ICMP {1:.>22} -> {2: <15} | '
'Type: {3}'.format(i, self.p.ipv4.source,
self.p.ipv4.dest,
self.p.icmp.type_txt))
def _display_packet_contents(self):
if self.display_data is True:
print('{0}[+] DATA:'.format(i))
data = self.p.data.decode(errors='ignore').\
replace('\n', '\n{0}'.format(i*2))
print('{0}{1}'.format(i, data))
def sniff(interface: str, displaydata: bool):
"""Control the flow of execution of the Packet Sniffer tool."""
packet_sniffer = PacketSniffer(interface)
to_screen = SniffToScreen(subject=packet_sniffer,
displaydata=displaydata)
try:
print('\n[>>>] Sniffer initialized. Waiting for incoming packets. '
'Press Ctrl-C to abort...\n')
packet_sniffer.execute()
except KeyboardInterrupt:
raise SystemExit('Aborting packet capture...')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='A pure-Python network packet '
'sniffer.')
parser.add_argument('-i', '--interface', type=str, default=None,
help='Interface from which packets will be captured '
'(captures from all available interfaces by '
'default).')
parser.add_argument('-d', '--displaydata', action='store_true',
help='Output packet data during capture.')
cli_args = parser.parse_args()
sniff(interface=cli_args.interface,
displaydata=cli_args.displaydata)