-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
103 lines (90 loc) · 3.81 KB
/
cli.py
File metadata and controls
103 lines (90 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python
# coding: utf8
import socket, sys, curses, argparse
from core import Tracer
from ipip import IPData
max_info_len = [20]
def format_hop_info(hop_ip, geoip):
loc = ''
if geoip:
try:
res = geoip.find(hop_ip)
if res and res != 'N/A': loc = ' '.join(res.split('\t')[1:]).strip()
except: pass
return f"{hop_ip}" + (f" [{loc}]" if loc else "")
def format_line_content(tracer, target, ttl, geoip, hop_ip):
info = format_hop_info(hop_ip, geoip)
max_info_len[0] = max(max_info_len[0], len(info))
rtts = tracer.rtt_map[(target, ttl, hop_ip)]
if not rtts: return ""
avg = sum(rtts)/len(rtts)
sd = (sum((x-avg)**2 for x in rtts)/len(rtts))**0.5 if len(rtts) > 1 else 0.0
return f"{info:<{max_info_len[0]}} {avg:6.1f}ms±{sd:3.1f}"
def redraw(stdscr, t, target, geoip):
try:
stdscr.erase()
stdscr.addstr(0, 0, f"Tracing {target}...")
line_idx = 1
# Range 1 to max_ttl + 1 if max_ttl was reached, else 40
m_ttl = t.max_ttl[target]
for ttl in range(1, m_ttl + 1):
hops = t.result[target].get(ttl, [])
if not hops:
stdscr.addstr(line_idx, 0, f" {ttl:2d} * * *")
line_idx += 1
else:
for i, hop_ip in enumerate(hops):
prefix = f" {ttl:2d} " if i == 0 else " "
content = format_line_content(t, target, ttl, geoip, hop_ip)
try:
stdscr.addstr(line_idx, 0, prefix + content)
line_idx += 1
except curses.error: pass
stdscr.refresh()
except curses.error: pass
def main(stdscr, target, counts=None, force_ipv6=False):
try:
family = socket.AF_INET6 if force_ipv6 else socket.AF_UNSPEC
addr_infos = socket.getaddrinfo(target, None, family, socket.SOCK_RAW)
v4 = [i for i in addr_infos if i[0] == socket.AF_INET]
v6 = [i for i in addr_infos if i[0] == socket.AF_INET6]
info = v6[0] if force_ipv6 and v6 else (v4[0] if v4 else (v6[0] if v6 else None))
if not info: raise Exception("No address found")
actual_family, target_ip = info[0], info[4][0]
except Exception as e:
print(f"Error resolving {target}: {e}"); return
geoip = None
if actual_family == socket.AF_INET:
try: geoip = IPData('17monipdb.dat')
except: pass
t = Tracer(family=actual_family)
printed_keys = set()
def on_pong(t, target, hop, ttl, rtt, is_target):
if ttl > t.max_ttl[target]: return
if counts:
print(f" {ttl:2d} " + format_line_content(t, target, ttl, geoip, hop))
else:
if stdscr: redraw(stdscr, t, target, geoip)
else:
key = (target, ttl, hop)
if key not in printed_keys and (len(t.rtt_map[key]) >= 3 or is_target):
print(f" {ttl:2d} " + format_line_content(t, target, ttl, geoip, hop))
printed_keys.add(key)
t.on_pong = on_pong
if stdscr:
try:
curses.start_color(); curses.use_default_colors()
stdscr.addstr(0, 0, f"Tracing {target} ({target_ip})..."); stdscr.refresh()
except: pass
t.run([target_ip], counts=counts)
if stdscr:
try: stdscr.getch()
except: pass
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('target', help='Target host or IP')
parser.add_argument('-c', '--count', type=int, help='Pings per TTL')
parser.add_argument('-6', dest='ipv6', action='store_true', help='Force IPv6')
args = parser.parse_args()
if args.count: main(None, args.target, counts=args.count, force_ipv6=args.ipv6)
else: curses.wrapper(main, args.target, force_ipv6=args.ipv6)