-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathIcmpPing.py
136 lines (104 loc) · 4.21 KB
/
IcmpPing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from socket import *
import os
import sys
import struct
import time
import select
import binascii
ICMP_ECHO_REQUEST = 8
NUM_PACKETS = 3
def checksum(string):
csum = 0
countTo = (len(string) // 2) * 2
count = 0
while count < countTo:
thisVal = string[count + 1] * 256 + string[count]
csum = csum + thisVal
csum = csum & 0xffffffff
count = count + 2
if countTo < len(string):
csum = csum + string[len(string) - 1]
csum = csum & 0xffffffff
csum = (csum >> 16) + (csum & 0xffff)
csum = csum + (csum >> 16)
answer = ~csum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receiveOnePing(mySocket, ID, timeout, destAddr):
timeLeft = timeout
while True:
startedSelect = time.time()
whatReady = select.select([mySocket], [], [], timeLeft)
howLongInSelect = (time.time() - startedSelect)
if whatReady[0] == []: # Timeout
return "Request timed out."
timeReceived = time.time()
recPacket, addr = mySocket.recvfrom(1024)
# Fetch the ICMPHeader from the received IP
#Fill in start
icmp_header = recPacket[20:28]
#Fill in end
rawTTL = struct.unpack("s", bytes([recPacket[8]]))[0]
# binascii -- Convert between binary and ASCII
TTL = int(binascii.hexlify(rawTTL), 16)
# Fetch icmpType, code, checksum, packetID, and sequence from ICMPHeader using struct.unpack method
# Fill in start
icmpType, code, checksum, packetID, sequence = struct.unpack(
"bbHHh", icmp_header)
# Fill in end
if packetID == ID:
byte = struct.calcsize("d")
timeSent = struct.unpack("d", recPacket[28:28 + byte])[0]
return "Reply from %s: bytes=%d time=%f5ms TTL=%d" % (destAddr, len(recPacket), (timeReceived - timeSent) * 1000, TTL)
timeLeft = timeLeft - howLongInSelect
if timeLeft <= 0:
return "Request timed out."
def sendOnePing(mySocket, destAddr, ID):
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
myChecksum = 0
# Make a dummy header with a 0 checksum
# struct -- Interpret strings as packed binary data
header = struct.pack("BBHHH", ICMP_ECHO_REQUEST, 0, myChecksum, ID, 1)
data = struct.pack("d", time.time())
# Calculate the checksum on the data and the dummy header.
myChecksum = checksum(header + data)
# Get the right checksum, and put in the header
if sys.platform == 'darwin':
# Convert 16-bit integers from host to network byte order
myChecksum = htons(myChecksum) & 0xffff
else:
myChecksum = htons(myChecksum)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, myChecksum, ID, 1)
packet = header + data
# AF_INET address must be tuple, not string
mySocket.sendto(packet, (destAddr, 1))
# Both LISTS and TUPLES consist of a number of objects
# which can be referenced by their position number within the object.
def doOnePing(destAddr, timeout):
icmp = getprotobyname("icmp")
# SOCK_RAW is a powerful socket type. For more details: http://sock-raw.org/papers/sock_raw
mySocket = socket(AF_INET, SOCK_RAW, icmp)
myID = os.getpid() & 0xFFFF # Return the current process i
sendOnePing(mySocket, destAddr, myID)
delay = receiveOnePing(mySocket, myID, timeout, destAddr)
mySocket.close()
return delay
def ping(host, timeout=1):
# timeout=1 means: If one second goes by without a reply from the server,
# the client assumes that either the client's ping or the server's pong is lost
dest = gethostbyname(host)
print("Pinging " + dest + " using Python:")
print("")
# Send ping requests to a server separated by approximately one second
for i in range(NUM_PACKETS):
delay = doOnePing(dest, timeout)
print(delay)
time.sleep(1) # one second
return delay
if __name__ == '__main__':
host = sys.argv[1]
ping(host)
# ping("google.com")
# ping("www.suzunoya.com")
# ping("localhost")