Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 71 additions & 0 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 13 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import argparse
import click
from tcpping import TcpPing


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="TCP Ping")
parser.add_argument("host", help="Host")
parser.add_argument("port", type=int, default=80, help="Port")
parser.add_argument("-c", "--count", type=int, default=0, help="Number of pings to send (default infinity)")
parser.add_argument("-t", "--timeout", type=float, default=2.0, help="Timeout for each ping in seconds (default 2)")
parser.add_argument("-i", "--interval", type=float, default=1.0, help="Interval between pings in seconds (default 1)")
return parser.parse_args()
@click.command()
@click.argument('hosts', type=str, nargs=-1)
@click.argument('port', type=int)
@click.option('-c', '--count', default=0, type=int, help='Number of pings to send (default: 0, infinite).')
@click.option('-t', '--timeout', default=2.0, type=float, help='Timeout for each ping in seconds (default: 2).')
@click.option('-i', '--interval', default=1.0, type=float, help='Interval between pings in seconds (default: 1).')
@click.option('--http', is_flag=True, help='Use HTTP.')
def tcping(hosts: str, port: int, count: int, timeout: float, interval: float, http: bool):
tcp_ping = TcpPing(hosts, port, count, timeout, interval, http)
tcp_ping.run()


if __name__ == "__main__":
args = parse_args()
tcp_ping = TcpPing(args.host, args.port, args.count, args.timeout, args.interval)
tcp_ping.run()
if __name__ == '__main__':
tcping()
102 changes: 67 additions & 35 deletions tcpping.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,90 @@
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

from scapy.all import IP, TCP, sr1

class TcpPing:
def __init__(self, host: str, port: int, count: int, timeout: float, interval: float):
self.host = host
self.port = port
self.count = count
self.timeout = timeout
self.interval = interval
self.sent_packets = 0
self.received_packets = 0
self.response_times = []

def send_ping(self) -> float | None:
packet = IP(dst=self.host) / TCP(dport=self.port, flags="S")
def __init__(self, hosts: str, port: int, count: int, timeout: float, interval: float, http: bool):
self._hosts = hosts
self._port = port
self._count = count
self._timeout = timeout
self._interval = interval
self._http = http
self._sent_packets = 0
self._received_packets = 0
self._response_times = []
self._stop_event = threading.Event()

def _send_ping(self, host: str) -> float | None:
packet = (IPv6(dst=host) if ":" in host else IP(dst=host)) / TCP(dport=self._port, flags="S")
if self._http:
packet = packet / ("GET / HTTP/1.1\r\nHost: " + host + "\r\n\r\n")
start_time = time.time()
response = sr1(packet, timeout=self.timeout, verbose=0)
response = sr1(packet, timeout=self._timeout, verbose=0)
elapsed_time = (time.time() - start_time) * 1000

self.sent_packets += 1
self._sent_packets += 1
if response and response.haslayer(TCP) and response.getlayer(TCP).flags == 0x12:
self.received_packets += 1
self.response_times.append(elapsed_time)
rst_packet = IP(dst=self.host) / TCP(dport=self.port, flags="R")
sr1(rst_packet, timeout=self.timeout, verbose=0)
self._received_packets += 1
self._response_times.append(elapsed_time)
rst_packet = (IPv6(dst=host) if ":" in host else IP(dst=host)) / TCP(dport=self._port, flags="R")
if self._http:
rst_packet = rst_packet / ("GET / HTTP/1.1\r\nHost: " + host + "\r\n\r\n")
sr1(rst_packet, timeout=self._timeout, verbose=0)
return elapsed_time
else:
return None

def print_statistics(self) -> None:
def _print_statistics(self) -> None:
print("\n--- TCPing statistics ---")
print(f"Sent packets: {self.sent_packets}")
print(f"Received packets: {self.received_packets}")
loss = ((self.sent_packets - self.received_packets) / self.sent_packets) * 100 if self.sent_packets > 0 else 0
print(f"Sent packets: {self._sent_packets}")
print(f"Received packets: {self._received_packets}")
loss = ((self._sent_packets - self._received_packets) / self._sent_packets) * 100 if self._sent_packets > 0 else 0
print(f"Packet loss: {loss:.2f}%")
if self.received_packets > 0:
print(f"Min response time: {min(self.response_times):.2f} ms")
print(f"Max response time: {max(self.response_times):.2f} ms")
print(f"Average response time: {sum(self.response_times) / len(self.response_times):.2f} ms")
if self._received_packets > 0:
print(f"Min response time: {min(self._response_times):.2f} ms")
print(f"Max response time: {max(self._response_times):.2f} ms")
print(f"Average response time: {sum(self._response_times) / len(self._response_times):.2f} ms")
else:
print("No responses received.")

def run(self) -> None:
def _ping(self, host) -> None:
try:
count = 0
while self.count == 0 or count < self.count:
response_time = self.send_ping()
while (self._count == 0 or count < self._count) and not self._stop_event.is_set():
response_time = self._send_ping(host)
if response_time is not None:
print(f"Reply from {self.host}:{self.port}, time={response_time:.2f} ms")
print(f"Reply from {host}:{self._port}, time={response_time:.2f} ms")
else:
print(f"Request to {self.host}:{self.port} timed out.")
print(f"Request to {host}:{self._port} timed out.")
count += 1
time.sleep(self.interval)
finally:
self.print_statistics()
sys.exit(0)
time.sleep(self._interval)

except KeyboardInterrupt:
pass


def run(self) -> None:
with ThreadPoolExecutor(max_workers=8) as executor:
try:
futures = {
executor.submit(self._ping, host): host
for host in self._hosts
}

while not self._stop_event.is_set() and self._sent_packets <= self._count * 2:
for future in futures:
try:
future.result(timeout=0.5)
except TimeoutError:
pass

finally:
self._stop_event.set()
executor.shutdown(wait=True)
self._print_statistics()
sys.exit(0)