diff --git a/code/default/launcher/web_control.py b/code/default/launcher/web_control.py index 34c56087a1..c1ccff5b0c 100644 --- a/code/default/launcher/web_control.py +++ b/code/default/launcher/web_control.py @@ -855,7 +855,6 @@ def req_debug_handler(self): dat += "thread num:%d\r\n" % threading.active_count() for thread in threading.enumerate(): dat += "\nThread: %s \r\n" % (thread.name) - # dat += traceback.format_exc(sys._current_frames()[thread.ident]) stack = sys._current_frames()[thread.ident] st = traceback.extract_stack(stack) stl = traceback.format_list(st) diff --git a/code/default/lib/noarch/front_base/boringssl_wrap.py b/code/default/lib/noarch/front_base/boringssl_wrap.py index 3ca6a9ecea..bfd05e9b73 100644 --- a/code/default/lib/noarch/front_base/boringssl_wrap.py +++ b/code/default/lib/noarch/front_base/boringssl_wrap.py @@ -19,6 +19,7 @@ def __init__(self, context, sock, ip_str=None, sni=None, on_close=None): self._context = context self._sock = sock self._fileno = self._sock.fileno() + # self._context.logger.debug("sock %s init fd:%d", ip_str, self._fileno) self.ip_str = utils.to_bytes(ip_str) self.sni = sni self._makefile_refs = 0 @@ -240,18 +241,49 @@ def close(self): self.running = False if not self.socket_closed: if self._connection: - bssl.BSSL_SSL_shutdown(self._connection) + res = bssl.BSSL_SSL_shutdown(self._connection) + # res == 0: close_notify sent but not recv, means you need to call SSL_shutdown again if you want a full bidirectional shutdown. + # res == 1: success, mean you previously received a close_notify alert from the other peer, and you're totally done + # res == -1: failed + # self._context.logger.debug("sock %s SSL_shutdown fd:%d res:%d", self.ip_str, self._fileno, res) + + if res < 0: + error = bssl.BSSL_SSL_get_error(self._connection, res) + # self._context.logger.debug("sock %s shutdown fd:%d error:%d", self.ip_str, self._fileno, error) + if error == 1: + p = ffi.new("char[]", + b"hello, worldhello, worldhello, worldhello, worldhello, world") # p is a 'char *' + q = ffi.new("char **", p) # q is a 'char **' + line_no = 0 + line_no_p = ffi.new("int *", line_no) + error = bssl.BSSL_ERR_get_error_line(q, line_no_p) + filename = ffi.string(q[0]) + # self._context.logger.error("error:%d file:%s, line:%s", error, filename, line_no_p[0]) + self._context.logger.debug("sock %s shutdown error: %s, file:%s, line:%d, sni:%s" % + (self.ip_str, error, filename, line_no_p[0], self.sni)) + else: + self._context.logger.debug("sock %s shutdown error:%s" % (self.ip_str, error)) + + bssl.BSSL_SSL_free(self._connection) + self._connection = None + + if self._sock: + try: + self._sock.close() + # self._context.logger.debug("sock %s sock_close fd:%d", self.ip_str, self._fileno) + except Exception as e: + # self._context.logger.debug("sock %s sock_close fd:%d e:%r", self.ip_str, self._fileno, e) + pass + self._sock = None self.socket_closed = True + if self._on_close: self._on_close(self.ip_str, self.sni) + self._on_close = None def __del__(self): self.close() - if self._connection: - bssl.BSSL_SSL_free(self._connection) - self._connection = None - self._sock = None def settimeout(self, t): if not self.running: diff --git a/code/default/lib/noarch/front_base/connect_manager.py b/code/default/lib/noarch/front_base/connect_manager.py index 232e08cc26..465f5ff3c0 100644 --- a/code/default/lib/noarch/front_base/connect_manager.py +++ b/code/default/lib/noarch/front_base/connect_manager.py @@ -312,8 +312,9 @@ def _create_ssl_connection(self, ip_str, sni, host): t = threading.Thread(target=self._connect_ssl, args=fn_args, name="connect_ssl_%s" % ip_str) t.start() try: - ssl_sock = q.get(timeout=3) + ssl_sock = q.get(timeout=30) except: + self.logger.warn("connect_ssl_timeout %s", ip_str) raise socket.error("timeout") if not isinstance(ssl_sock, SSLConnection): diff --git a/code/default/lib/noarch/front_base/http_common.py b/code/default/lib/noarch/front_base/http_common.py index 93bd5bd7a8..dd162fd68d 100644 --- a/code/default/lib/noarch/front_base/http_common.py +++ b/code/default/lib/noarch/front_base/http_common.py @@ -1,3 +1,4 @@ +import threading import time import random @@ -204,6 +205,8 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb self.retry_task_cb = retry_task_cb self.idle_cb = idle_cb self.log_debug_data = log_debug_data + + self._lock = threading.Lock() self.accept_task = True self.keep_running = True self.processed_tasks = 0 @@ -215,6 +218,7 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb self.last_send_time = self.ssl_sock.create_time self.life_end_time = self.ssl_sock.create_time + \ random.randint(self.config.connection_max_life, int(self.config.connection_max_life * 1.5)) + # self.logger.debug("worker.init %s", self.ip_str) def __str__(self): o = "" @@ -260,24 +264,30 @@ def update_debug_data(self, rtt, sent, received, speed): # else: # self.rtt = rtt - # self.log_debug_data(rtt, sent, received) + self.log_debug_data(rtt, sent, received) return def close(self, reason): - if not self.keep_running: - self.logger.warn("worker already closed %s", self.ip_str) - return - - self.accept_task = False - self.keep_running = False - self.ssl_sock.close() - if reason not in ["idle timeout", "life end"]: - now = time.time() - inactive_time = now - self.last_recv_time - if inactive_time < self.config.http2_ping_min_interval: - self.logger.debug("%s worker close:%s inactive:%d", self.ip_str, reason, inactive_time) - self.ip_manager.report_connect_closed(self.ssl_sock.ip_str, self.ssl_sock.sni, reason) - self.close_cb(self) + with self._lock: + if not self.keep_running: + # self.logger.warn("worker %s already closed %s", self.ip_str, reason) + return + + # self.logger.debug("worker.close %s reason:%s", self.ip_str, reason) + self.accept_task = False + self.keep_running = False + self.ssl_sock.close() + if reason not in ["idle timeout", "life end"]: + now = time.time() + inactive_time = now - self.last_recv_time + if inactive_time < self.config.http2_ping_min_interval: + self.logger.debug("%s worker close:%s inactive:%d", self.ip_str, reason, inactive_time) + self.ip_manager.report_connect_closed(self.ssl_sock.ip_str, self.ssl_sock.sni, reason) + self.close_cb(self) + + def __del__(self): + # self.logger.debug("__del__ %s", self.ip_str) + self.close("__del__") def get_score(self): # The smaller, the better diff --git a/code/default/lib/noarch/simple_http_client.py b/code/default/lib/noarch/simple_http_client.py index e0204bb751..b10d66e0bf 100644 --- a/code/default/lib/noarch/simple_http_client.py +++ b/code/default/lib/noarch/simple_http_client.py @@ -129,6 +129,17 @@ def __init__(self, sock): self.select2 = selectors.DefaultSelector() self.select2.register(sock, selectors.EVENT_READ) + def __del__(self): + try: + self.select2.unregister(self.sock) + except: + pass + + try: + socket.socket.close(self.sock) + except: + pass + def recv(self, to_read=8192, timeout=30.0): if timeout < 0: raise Exception("recv timeout") diff --git a/code/default/lib/noarch/xlog.py b/code/default/lib/noarch/xlog.py index f698d4cfa4..a3cf904e82 100644 --- a/code/default/lib/noarch/xlog.py +++ b/code/default/lib/noarch/xlog.py @@ -27,7 +27,7 @@ def __init__(self, name, buffer_size=0, file_name=None, roll_num=1, log_path=None, save_start_log=0, save_warning_log=False): self.name = str(name) self.file_max_size = 1024 * 1024 - self.buffer_lock = threading.Lock() + self.buffer_lock = threading.RLock() self.buffer = {} # id => line self.buffer_size = buffer_size self.last_no = 0 diff --git a/code/default/smart_router/local/gfw_black_list.txt b/code/default/smart_router/local/gfw_black_list.txt index 8979ffbcea..0901ae5099 100644 --- a/code/default/smart_router/local/gfw_black_list.txt +++ b/code/default/smart_router/local/gfw_black_list.txt @@ -1838,6 +1838,7 @@ google.ca google.calstate.edu google.cd google.ci +google.cn google.co.id google.co.jp google.co.kr diff --git a/code/default/smart_router/local/smart_route.py b/code/default/smart_router/local/smart_route.py index f6f03faf84..05282cfaad 100644 --- a/code/default/smart_router/local/smart_route.py +++ b/code/default/smart_router/local/smart_route.py @@ -580,13 +580,13 @@ def handle_domain_proxy(sock, host, port, client_address, left_buf=""): if not g.domain_cache.accept_gae(host): rule_list.remove("gae") elif g.config.country_code == "CN": - if g.gfwlist.in_white_list(host): - rule_list = ["direct", "gae", "socks", "redirect_https"] - elif g.gfwlist.in_block_list(host): + if g.gfwlist.in_block_list(host): if g.config.pac_policy == "black_X-Tunnel": rule_list = ["socks", "redirect_https", "direct", "gae"] else: rule_list = ["gae", "socks", "redirect_https", "direct"] + elif g.gfwlist.in_white_list(host): + rule_list = ["direct", "gae", "socks", "redirect_https"] else: ips = g.dns_query.query_recursively(host, 1) if g.ip_region.check_ips(ips): diff --git a/code/default/version.txt b/code/default/version.txt index 4cc0e35cb3..566ac6388b 100644 --- a/code/default/version.txt +++ b/code/default/version.txt @@ -1 +1 @@ -5.6.0 \ No newline at end of file +5.6.1 \ No newline at end of file diff --git a/code/default/x_tunnel/local/base_container.py b/code/default/x_tunnel/local/base_container.py index 2fb7688ee6..d35af5f540 100644 --- a/code/default/x_tunnel/local/base_container.py +++ b/code/default/x_tunnel/local/base_container.py @@ -136,7 +136,7 @@ def get(self): return data def status(self): - out_string = "Ack_pool:len %d
\r\n" % len(self.ack_buffer) + out_string = "Ack_pool:len %d\r\n" % len(self.ack_buffer) return out_string @@ -168,19 +168,19 @@ def notify(self): except: pass - def wait(self, end_time): + def wait(self, wait_order): with self.lock: lock = threading.Lock() lock.acquire() if len(self.waiters) == 0: - self.waiters.append((end_time, lock)) + self.waiters.append((wait_order, lock)) else: is_max = True for i in range(0, len(self.waiters)): try: - iend_time, ilock = self.waiters[i] - if iend_time > end_time: + i_wait_order, ilock = self.waiters[i] + if i_wait_order > wait_order: is_max = False break except Exception as e: @@ -190,17 +190,17 @@ def wait(self, end_time): continue if is_max: - self.waiters.append((end_time, lock)) + self.waiters.append((wait_order, lock)) else: - self.waiters.insert(i, (end_time, lock)) + self.waiters.insert(i, (wait_order, lock)) lock.acquire() def status(self): - out_string = "waiters[%d]:
\n" % len(self.waiters) + out_string = "waiters[%d]:\n" % len(self.waiters) for i in range(0, len(self.waiters)): end_time, lock = self.waiters[i] - out_string += "%d
\r\n" % (end_time) + out_string += "%d\r\n" % (end_time) return out_string @@ -268,15 +268,15 @@ def get(self): return "", 0 def status(self): - out_string = "SendBuffer:
\n" - out_string += " size:%d
\n" % self.pool_size - out_string += " last_put_time:%f
\n" % (time.time() - self.last_put_time) - out_string += " head_sn:%d
\n" % self.head_sn - out_string += " tail_sn:%d
\n" % self.tail_sn - out_string += "block_list:[%d]
\n" % len(self.block_list) + out_string = "SendBuffer:\n" + out_string += " size:%d\n" % self.pool_size + out_string += " last_put_time:%f\n" % (time.time() - self.last_put_time) + out_string += " head_sn:%d\n" % self.head_sn + out_string += " tail_sn:%d\n" % self.tail_sn + out_string += "block_list:[%d]\n" % len(self.block_list) for sn in sorted(self.block_list.keys()): data = self.block_list[sn] - out_string += "[%d] len:%d
\r\n" % (sn, len(data)) + out_string += "[%d] len:%d\r\n" % (sn, len(data)) return out_string @@ -323,10 +323,10 @@ def put(self, sn, data): self.lock.release() def status(self): - out_string = "Block_receive_pool:
\r\n" - out_string += " next_sn:%d
\r\n" % self.next_sn + out_string = "Block_receive_pool:\r\n" + out_string += " next_sn:%d\r\n" % self.next_sn for sn in sorted(self.block_list): - out_string += "[%d]
\r\n" % (sn) + out_string += "[%d] \r\n" % (sn) return out_string @@ -341,6 +341,17 @@ def __init__(self, session, xlog): self.sock_conn_map = {} self._lock = threading.RLock() + def status(self): + out_string = "ConnectionPipe:\r\n" + out_string += " running: %s\r\n" % self.running + out_string += " thread: %s\r\n" % self.th + out_string += " conn: " + for conn in self.sock_conn_map.values(): + out_string += "%d," % (conn.conn_id) + out_string += "\r\n" + + return out_string + def start(self): self.running = True self.sock_conn_map = {} @@ -469,12 +480,18 @@ def pipe_worker(self): data_len = len(data) if data_len == 0: - self.xlog.debug("Conn session:%s conn:%d recv socket closed", self.session.session_id, - conn.conn_id) - self.close_sock(sock, "receive") + self.xlog.debug("Conn conn:%d recv zero", conn.conn_id) + if conn.recv_zero: + self.xlog.debug("Conn conn:%d recv zero again", conn.conn_id) + self.close_sock(sock, "receive") + else: + conn.recv_zero = True continue else: conn.last_active = now + if conn.recv_zero: + self.xlog.error("recv_zero restored conn:%d", conn.conn_id) + conn.recv_zero = False self._debug_log("Conn session:%s conn:%d local recv len:%d pos:%d", self.session.session_id, conn.conn_id, data_len, conn.received_position) @@ -489,7 +506,7 @@ def pipe_worker(self): except Exception as e: xlog.exception("recv_worker e:%r", e) - for sock in self.sock_conn_map: + for sock in dict(self.sock_conn_map): try: self.select2.unregister(sock) except Exception as e: @@ -518,6 +535,7 @@ def __init__(self, session, conn_id, sock, host, port, windows_size, windows_ack self.cmd_queue = {} self.running = True self.blocked = False + self.recv_zero = False # will close when continue recv zero self.send_buffer = b"" self.received_position = 0 self.remote_acked_position = 0 @@ -530,6 +548,8 @@ def __init__(self, session, conn_id, sock, host, port, windows_size, windows_ack self.transferred_close_to_peer = False if sock: self.next_cmd_seq = 1 + self._fd = sock.fileno() + # self.xlog.debug("conn:%d init fd:%d", conn_id, self._fd) else: self.next_cmd_seq = 0 @@ -540,23 +560,24 @@ def start(self, block): self.connection_pipe.add_sock_event(self.sock, self, selectors.EVENT_READ) def status(self): - out_string = "Conn[%d]: %s:%d
\r\n" % (self.conn_id, self.host, self.port) - out_string += " received_position:%d/ Ack:%d
\n" % (self.received_position, self.remote_acked_position) - out_string += " sended_position:%d/ win:%d
\n" % (self.sended_position, self.sent_window_position) - out_string += " next_cmd_seq:%d
\n" % self.next_cmd_seq - out_string += " next_recv_seq:%d
\n" % self.next_recv_seq - out_string += " status: running:%r
\n" % self.running - out_string += " blocked: %s
\n" % self.blocked + out_string = "Conn[%d]: %s:%d\n" % (self.conn_id, self.host, self.port) + out_string += " received_position:%d/ Ack:%d \n" % (self.received_position, self.remote_acked_position) + out_string += " sended_position:%d/ win:%d\n" % (self.sended_position, self.sent_window_position) + out_string += " next_cmd_seq:%d\n" % self.next_cmd_seq + out_string += " next_recv_seq:%d\n" % self.next_recv_seq + out_string += " status: running:%r\n" % self.running + out_string += " blocked: %s\n" % self.blocked + out_string += " recv_zero: %s\n" % self.recv_zero if self.send_buffer: - out_string += " send_buffer: %d
\n" % len(self.send_buffer) - out_string += " transferred_close_to_peer:%r
\n" % self.transferred_close_to_peer - out_string += " sock:%r
\n" % (self.sock is not None) - out_string += " cmd_queue.len:%d
" % len(self.cmd_queue) - out_string += " create time: %s
" % datetime.fromtimestamp(self.create_time).strftime('%Y-%m-%d %H:%M:%S.%f') - out_string += " last active: %s
" % datetime.fromtimestamp(self.last_active).strftime('%Y-%m-%d %H:%M:%S.%f') + out_string += " send_buffer: %d\n" % len(self.send_buffer) + out_string += " transferred_close_to_peer:%r\n" % self.transferred_close_to_peer + out_string += " sock:%r\n" % (self.sock is not None) + out_string += " cmd_queue.len:%d\n" % len(self.cmd_queue) + out_string += " create time: %s\n" % datetime.fromtimestamp(self.create_time).strftime('%Y-%m-%d %H:%M:%S.%f') + out_string += " last active: %s\n" % datetime.fromtimestamp(self.last_active).strftime('%Y-%m-%d %H:%M:%S.%f') for seq in self.cmd_queue: out_string += "[%d]," % seq - out_string += "
\n" + out_string += "\n" return out_string @@ -565,7 +586,8 @@ def stop(self, reason=""): name="do_stop_%s:%d" % (self.host, self.port)).start() def do_stop(self, reason="unknown"): - self.xlog.debug("Conn session:%s conn:%d stop:%s", utils.to_str(self.session.session_id), self.conn_id, reason) + self.xlog.debug("Conn session:%s conn:%d fd:%d stop:%s", utils.to_str(self.session.session_id), self.conn_id, + self._fd, reason) self.running = False self.connection_pipe.remove_sock(self.sock) @@ -573,11 +595,10 @@ def do_stop(self, reason="unknown"): self.cmd_queue = {} if self.sock is not None: - if self.sock.fileno() != -1: - try: - self.sock.close() - except: - pass + try: + self.sock.close() + except: + pass self.sock = None # self.xlog.debug("Conn session:%s conn:%d stopped", self.session.session_id, self.conn_id) diff --git a/code/default/x_tunnel/local/proxy_handler.py b/code/default/x_tunnel/local/proxy_handler.py index 23e3e41680..fdcb3ce972 100644 --- a/code/default/x_tunnel/local/proxy_handler.py +++ b/code/default/x_tunnel/local/proxy_handler.py @@ -62,11 +62,10 @@ def handle(self): return except socket.error as e: - xlog.exception('socks handler read error %r', e) + xlog.exception('proxy handler read error %r', e) self.connection.close() - return except Exception as e: - xlog.exception("any err:%r", e) + xlog.exception("proxy handler err:%r", e) self.connection.close() def read_null_end_line(self): diff --git a/code/default/x_tunnel/local/proxy_session.py b/code/default/x_tunnel/local/proxy_session.py index 9179be766f..93c4f55753 100644 --- a/code/default/x_tunnel/local/proxy_session.py +++ b/code/default/x_tunnel/local/proxy_session.py @@ -34,6 +34,14 @@ def decrypt_data(data): else: return data +def traffic_readable(num, units=('B', 'KB', 'MB', 'GB')): + for unit in units: + if num >= 1024: + num /= 1024.0 + else: + break + return '{:.1f} {}'.format(num, unit) + def sleep(t): end_time = time.time() + t @@ -42,7 +50,8 @@ def sleep(t): return sleep_time = min(5, end_time - time.time()) - time.sleep(sleep_time) + if sleep_time > 0.01: + time.sleep(sleep_time) class ProxySession(object): @@ -68,10 +77,18 @@ def __init__(self): self.on_road_num = 0 self.last_receive_time = 0 self.last_send_time = 0 - self.traffic = 0 self.server_send_buf_size = 0 self.target_on_roads = 0 + # speed calculation + self.traffic_upload = 0 + self.traffic_download = 0 + self.last_traffic_upload = 0 + self.last_traffic_download = 0 + self.last_traffic_reset_time = time.time() + self.upload_speed = 0.0 + self.download_speed = 0.0 + # the receive time of the tail of the socket receive buffer # if now - oldest_received_time > delay, then send. # set only no data in receive buffer @@ -100,7 +117,13 @@ def start(self): self.transfer_list = {} self.last_send_time = time.time() self.last_receive_time = 0 - self.traffic = 0 + + # speed calculation + self.traffic_upload = 0 + self.traffic_download = 0 + self.last_traffic_upload = 0 + self.last_traffic_download = 0 + self.last_traffic_reset_time = time.time() # sn => (payload, send_time) # sn => ack @@ -130,6 +153,22 @@ def start(self): xlog.info("session started.") return True + def traffic_speed_calculation(self): + now = time.time() + time_go = now - self.last_traffic_reset_time + if time_go > 0.5: + self.upload_speed = (self.traffic_upload - self.last_traffic_upload) / time_go + self.download_speed = (self.traffic_download - self.last_traffic_download) / time_go + + self.last_traffic_reset_time = now + self.last_traffic_upload = self.traffic_upload + self.last_traffic_download = self.traffic_download + + # xlog.debug("upload speed:%s download speed:%s", + # convert_data_size_easy_read(self.upload_speed), + # convert_data_size_easy_read(self.download_speed) + # ) + def stop(self): if not self.running: # xlog.warn("session stop but not running") @@ -214,15 +253,8 @@ def check_report_status(self): data = info["data"] g.tls_relay_front.set_ips(data["ips"]) - @staticmethod - def get_stat(type="second"): - def convert(num, units=('B', 'KB', 'MB', 'GB')): - for unit in units: - if num >= 1024: - num /= 1024.0 - else: - break - return '{:.1f} {}'.format(num, unit) + def get_stat(self, type="second"): + self.traffic_speed_calculation() res = {} rtt = 0 @@ -270,7 +302,7 @@ def convert(num, units=('B', 'KB', 'MB', 'GB')): "fail_num": dispatcher.fail_num, "worker_num": dispatcher.worker_num(), "total_traffics": "Up: %s / Down: %s" % ( - convert(dispatcher.total_sent), convert(dispatcher.total_received)) + traffic_readable(dispatcher.total_sent), traffic_readable(dispatcher.total_received)) } res["global"] = { @@ -280,37 +312,47 @@ def convert(num, units=('B', 'KB', 'MB', 'GB')): "slow_roundtrip": g.stat["slow_roundtrip"], "timeout_roundtrip": g.stat["timeout_roundtrip"], "resend": g.stat["resend"], - "speed": "Up: %s/s / Down: %s/s" % (convert(recent_sent), convert(recent_received)), - "total_traffics": "Up: %s / Down: %s" % (convert(total_sent), convert(total_received)) + "speed": "Up: %s/s / Down: %s/s" % (traffic_readable(self.upload_speed), traffic_readable(self.download_speed)), + "total_traffics": "Up: %s / Down: %s" % (traffic_readable(self.traffic_upload), traffic_readable(self.traffic_download)) } return res def status(self): - out_string = "session_id:%s
\n" % self.session_id - out_string += "thread num:%d
" % threading.active_count() - out_string += "running:%d
\n" % self.running - out_string += "last_send_time:%f
\n" % (time.time() - self.last_send_time) - out_string += "last_receive_time:%f
\n" % (time.time() - self.last_receive_time) - out_string += "last_conn:%d
\n" % self.last_conn_id - out_string += "last_transfer_no:%d
\n" % self.last_transfer_no - out_string += "traffic:%d
\n" % self.traffic - - out_string += "on_road_num:%d
\n" % self.on_road_num - out_string += "transfer_list: %d
\r\n" % len(self.transfer_list) + self.traffic_speed_calculation() + + out_string = "session_id: %s\n" % self.session_id + out_string += "extra_info: %s\n" % json.dumps(json.loads(self.get_login_extra_info()), indent=2) + out_string += "thread num: %d\n" % threading.active_count() + out_string += "running: %d\n" % self.running + out_string += "last_send_time: %f\n" % (time.time() - self.last_send_time) + out_string += "last_receive_time: %f ago\n" % (time.time() - self.last_receive_time) + out_string += "last_conn: %d\n" % self.last_conn_id + out_string += "last_transfer_no: %d\n" % self.last_transfer_no + out_string += "traffic_upload: %d\n" % self.traffic_upload + out_string += "traffic_download: %d\n" % self.traffic_download + out_string += "last_traffic_upload: %d\n" % self.last_traffic_upload + out_string += "last_traffic_download: %d\n" % self.last_traffic_download + out_string += "upload_speed: %f\n" % self.upload_speed + out_string += "download_speed: %f\n" % self.download_speed + out_string += "last_traffic_reset_time %f ago\n" % (time.time() - self.last_traffic_reset_time ) + + out_string += "on_road_num:%d\n" % self.on_road_num + out_string += "transfer_list: %d\n" % len(self.transfer_list) for transfer_no in sorted(self.transfer_list.keys()): transfer = self.transfer_list[transfer_no] if "start" in self.transfer_list[transfer_no]: time_way = " t:" + str((time.time() - self.transfer_list[transfer_no]["start"])) else: time_way = "" - out_string += "[%d] %s %s
\r\n" % (transfer_no, json.dumps(transfer), time_way) + out_string += "[%d] %s %s\n" % (transfer_no, json.dumps(transfer), time_way) - out_string += "
\n" + self.wait_queue.status() - out_string += "
\n" + self.send_buffer.status() - out_string += "
\n" + self.receive_process.status() + out_string += "\n" + self.wait_queue.status() + out_string += "\n" + self.send_buffer.status() + out_string += "\n" + self.receive_process.status() + out_string += "\n" + self.connection_pipe.status() for conn_id in self.conn_list: - out_string += "
\n" + self.conn_list[conn_id].status() + out_string += "\n" + self.conn_list[conn_id].status() return out_string @@ -525,7 +567,7 @@ def get_send_data(self, work_id): while self.running: data = self.get_data(work_id) # xlog.debug("get_send_data work_id:%d len:%d", work_id, len(data)) - if data or work_id < self.target_on_roads: + if data or self.on_road_num < self.target_on_roads: # xlog.debug("got data, force get ack") force = True @@ -606,6 +648,9 @@ def trigger_more(self): if action_num <= 0: return + # xlog.debug("running_num:%d on_road:%d target:%d action:%d", + # running_num, self.on_road_num, + # self.target_on_roads, action_num) for _ in range(0, action_num): self.wait_queue.notify() @@ -631,13 +676,9 @@ def roundtrip_task(self, work_id): pack_type = 2 if self.send_buffer.pool_size > g.config.max_payload or \ - (self.send_buffer.pool_size and len(self.wait_queue.waiters) < g.config.min_on_road): + len(self.wait_queue.waiters) < g.config.min_on_road: # xlog.debug("pool_size:%s waiters:%d", self.send_buffer.pool_size, len(self.wait_queue.waiters)) server_timeout = 0 - elif work_id > g.config.concurent_thread_num * 0.9: - server_timeout = 1 - elif work_id > g.config.concurent_thread_num * 0.7: - server_timeout = 3 else: server_timeout = g.config.roundtrip_timeout @@ -665,9 +706,14 @@ def roundtrip_task(self, work_id): if lock_time > 0.1: xlog.warn("lock_time: %f", lock_time) - if g.config.show_debug: - xlog.debug("start trip transfer_no:%d send_data_len:%d ack_len:%d timeout:%d", - transfer_no, send_data_len, send_ack_len, server_timeout) + # if g.config.show_debug: + # self.traffic_speed_calculation() + # xlog.debug("start trip work_id:%d transfer_no:%d send_data_len:%d ack_len:%d timeout:%d", + # work_id, transfer_no, send_data_len, send_ack_len, server_timeout) + # xlog.debug("start trip, work_id:%d target:%d running:%d timeout:%d up:%f down:%f", + # work_id, self.target_on_roads, self.on_road_num, server_timeout, + # self.upload_speed, self.download_speed) + while self.running: try: content, status, response = g.http_client.request(method="POST", host=g.server_host, @@ -679,7 +725,8 @@ def roundtrip_task(self, work_id): timeout=server_timeout + g.config.network_timeout) traffic = len(upload_post_data) + len(content) + 645 - self.traffic += traffic + self.traffic_upload += len(upload_post_data) + 645 + self.traffic_download += len(content) g.quota -= traffic if g.quota < 0: g.quota = 0 diff --git a/code/default/x_tunnel/local/tls_relay_front/config.py b/code/default/x_tunnel/local/tls_relay_front/config.py index 2e181576f2..645200ebda 100644 --- a/code/default/x_tunnel/local/tls_relay_front/config.py +++ b/code/default/x_tunnel/local/tls_relay_front/config.py @@ -12,7 +12,7 @@ def __init__(self, fn): self.set_var("allow_set_ips", 1) # https_dispatcher - self.set_var("dispather_min_idle_workers", 1) + self.set_var("dispather_min_idle_workers", 0) self.set_var("dispather_work_max_score", 20000) self.set_var("dispather_min_workers", 1) self.set_var("dispather_max_workers", 60) diff --git a/code/default/x_tunnel/local/web_control.py b/code/default/x_tunnel/local/web_control.py index eaf9f6c8fa..358f1f4964 100644 --- a/code/default/x_tunnel/local/web_control.py +++ b/code/default/x_tunnel/local/web_control.py @@ -66,7 +66,7 @@ def do_GET(self): return self.req_log_handler() elif path == "/debug": data = g.session.status() - return self.send_response('text/html', data) + return self.send_response('text/plain', data) elif path == "/info": return self.req_info_handler() elif path == "/config": @@ -240,7 +240,7 @@ def req_info_handler(self): "openai_balance": float(g.openai_balance), "quota": "%d" % (g.quota), "quota_list": g.quota_list, - "traffic": g.session.traffic, + "traffic": g.session.traffic_upload + g.session.traffic_download, "last_fail": g.last_api_error } self.response_json(res_arr)