From 02858bb40690bb93189b926d6851a3997a801893 Mon Sep 17 00:00:00 2001 From: Micheal X Date: Tue, 22 Aug 2023 10:55:37 +1200 Subject: [PATCH] 5.6.0 merge connection send/recv threads into one. --- code/default/launcher/start.py | 4 +- code/default/launcher/update.py | 1 + code/default/lib/noarch/selectors2.py | 37 ++ code/default/lib/noarch/simple_http_server.py | 1 + code/default/version.txt | 2 +- code/default/x_tunnel/local/base_container.py | 348 ++++++++++-------- code/default/x_tunnel/local/client.py | 92 +---- code/default/x_tunnel/local/config.py | 104 ++++++ code/default/x_tunnel/local/direct_front.py | 3 + code/default/x_tunnel/local/proxy_session.py | 19 +- 10 files changed, 362 insertions(+), 249 deletions(-) create mode 100644 code/default/x_tunnel/local/config.py diff --git a/code/default/launcher/start.py b/code/default/launcher/start.py index 6e61ea8e62..572980bc88 100644 --- a/code/default/launcher/start.py +++ b/code/default/launcher/start.py @@ -156,6 +156,9 @@ def main(): else: restart_from_except = False + update.start() + # put update before start all modules, generate uuid for x-tunnel. + module_init.start_all_auto() web_control.start(allow_remote) @@ -164,7 +167,6 @@ def main(): import webbrowser webbrowser.open("http://localhost:%s/" % host_port) - update.start() if has_desktop: download_modules.start_download() update_from_github.cleanup() diff --git a/code/default/launcher/update.py b/code/default/launcher/update.py index b718dbb756..0190fe795d 100644 --- a/code/default/launcher/update.py +++ b/code/default/launcher/update.py @@ -388,6 +388,7 @@ def update_check_loop(): def start(): + get_uuid() p = threading.Thread(target=update_check_loop, name="check_update") p.daemon = True p.start() diff --git a/code/default/lib/noarch/selectors2.py b/code/default/lib/noarch/selectors2.py index 8ba01c36d1..370b4476cf 100644 --- a/code/default/lib/noarch/selectors2.py +++ b/code/default/lib/noarch/selectors2.py @@ -198,6 +198,43 @@ def modify(self, fileobj, events, data=None): return key + def register_event(self, fileobj, event, data): + try: + key = self._fd_to_key[self._fileobj_lookup(fileobj)] + except KeyError: + # not registered any event before, register + return self.register(fileobj, event, data) + + if key.events & event: + # event already registered + return + else: + events = (key.events | event) + self.unregister(fileobj) + self.register(fileobj, events, data) + + def unregister_event(self, fileobj, event): + # Return None if fileobj is removed or not exists + + try: + key = self._fd_to_key[self._fileobj_lookup(fileobj)] + except KeyError: + # not exists + return + + if not key.events & event: + # this event is not registered + return key + + if key.events == event: + self.unregister(fileobj) + return + else: + events = (key.events & ~(event)) + data = key.data + self.unregister(fileobj) + return self.register(fileobj, events, data) + def select(self, timeout=None): """ Perform the actual selection until some monitored file objects are ready or the timeout expires. """ diff --git a/code/default/lib/noarch/simple_http_server.py b/code/default/lib/noarch/simple_http_server.py index 97914a2397..a6108686d2 100644 --- a/code/default/lib/noarch/simple_http_server.py +++ b/code/default/lib/noarch/simple_http_server.py @@ -660,6 +660,7 @@ def check_listen_port_or_reset(self): self.logger.warn("Listen %s:%d check failed", ip, port) self.shutdown() + time.sleep(3) self.start() return diff --git a/code/default/version.txt b/code/default/version.txt index 3b4176428c..4cc0e35cb3 100644 --- a/code/default/version.txt +++ b/code/default/version.txt @@ -1 +1 @@ -5.5.13 \ No newline at end of file +5.6.0 \ No newline at end of file diff --git a/code/default/x_tunnel/local/base_container.py b/code/default/x_tunnel/local/base_container.py index 04471886ee..2fb7688ee6 100644 --- a/code/default/x_tunnel/local/base_container.py +++ b/code/default/x_tunnel/local/base_container.py @@ -331,7 +331,7 @@ def status(self): return out_string -class ConnectionReceiving(object): +class ConnectionPipe(object): def __init__(self, session, xlog): self.session = session self.xlog = xlog @@ -339,6 +339,7 @@ def __init__(self, session, xlog): self.th = None self.select2 = selectors.DefaultSelector() self.sock_conn_map = {} + self._lock = threading.RLock() def start(self): self.running = True @@ -346,56 +347,89 @@ def start(self): def stop(self): self.running = False - self.xlog.debug("ConnectionReceiving stop") + self.xlog.debug("ConnectionPipe stop") - def add_sock(self, sock, conn): - if sock in self.sock_conn_map: + def _debug_log(self, fmt, *args, **kwargs): + if not self.session.config.show_debug: return + self.xlog.debug(fmt, *args, **kwargs) - # self.xlog.debug("add sock %s conn:%d", sock, conn.conn_id) - try: - self.select2.register(sock, selectors.EVENT_READ, conn) - except Exception as e: - if "is already registered" not in str(e): - self.xlog.exception("add_sock %s conn:%d e:%r", sock, conn.conn_id, e) + def add_sock_event(self, sock, conn, event): + # this function can repeat without through an error. + + if not sock: + return + + with self._lock: + self._debug_log("add_sock_event conn:%d event:%s", conn.conn_id, event) + self.sock_conn_map[sock] = conn + try: + self.select2.register_event(sock, event, conn) + except Exception as e: + self.xlog.warn("add_sock_event %s conn:%d e:%r", sock, conn.conn_id, e) + self.close_sock(sock, str(e) + "_when_add_sock_event") return - self.sock_conn_map[sock] = conn - if not self.th: - self.th = threading.Thread(target=self.recv_worker, name="x_tunnel_recv_worker") - self.th.start() - self.xlog.debug("ConnectionReceiving start") + if not self.th: + self.th = threading.Thread(target=self.pipe_worker, name="x_tunnel_pipe_worker") + self.th.start() + self.xlog.debug("ConnectionPipe start") + + def remove_sock_event(self, sock, event): + # this function can repeat without through an error. + + with self._lock: + if sock not in self.sock_conn_map: + return + + try: + conn = self.sock_conn_map[sock] + self._debug_log("remove_sock_event conn:%d event:%s", conn.conn_id, event) + res = self.select2.unregister_event(sock, event) + if not res: + # self.xlog.debug("remove_sock_event %s conn:%d event:%s removed all", sock, conn.conn_id, event) + del self.sock_conn_map[sock] + except Exception as e: + self.xlog.exception("remove_sock_event %s event:%s e:%r", sock, event, e) def remove_sock(self, sock): + with self._lock: + if sock not in self.sock_conn_map: + return + + try: + conn = self.sock_conn_map[sock] + self._debug_log("remove_sock all events conn:%d", conn.conn_id) + del self.sock_conn_map[sock] + self.select2.unregister(sock) + except Exception as e: + # error will happen when sock closed + self.xlog.warn("ConnectionPipe remove sock e:%r", e) + + def close_sock(self, sock, reason): if sock not in self.sock_conn_map: return try: - self.select2.unregister(sock) - # conn = self.sock_conn_map[sock] - # self.xlog.debug("remove sock conn:%d", conn.conn_id) - del self.sock_conn_map[sock] + conn = self.sock_conn_map[sock] + self.xlog.info("close conn:%d", conn.conn_id) + self.remove_sock(sock) + + conn.transfer_peer_close(reason) + conn.do_stop(reason=reason) except Exception as e: - self.xlog.warn("ConnectionReceiving remove sock e:%r", e) + self.xlog.exception("close_sock %s e:%r", sock, e) def reset_all_connections(self): for sock, conn in dict(self.sock_conn_map).items(): - try: - self.select2.unregister(sock) - except Exception as e: - xlog.warn("unregister %s e:%r", sock, e) + self.close_sock(sock, "reset_all") - conn.transfer_peer_close("recv closed") - sock.close() - conn.do_stop(reason="recv closed.") self.sock_conn_map = {} self.select2 = selectors.DefaultSelector() - def recv_worker(self): - # random_id = utils.to_str(utils.generate_random_lowercase(6)) + def pipe_worker(self): timeout = 0.001 while self.running: - if not self.sock_conn_map: break @@ -411,7 +445,6 @@ def recv_worker(self): timeout = 3.0 # self.xlog.debug("%s recv select timeout switch to %f", random_id, timeout) - continue else: # self.xlog.debug("%s recv select timeout switch to 0.001", random_id) @@ -430,27 +463,28 @@ def recv_worker(self): if event & selectors.EVENT_READ: try: data = sock.recv(65535) - except: + except Exception as e: + self._debug_log("conn:%d recv e:%r", conn.conn_id, e) data = "" + + data_len = len(data) + if data_len == 0: + self.xlog.debug("Conn session:%s conn:%d recv socket closed", self.session.session_id, + conn.conn_id) + self.close_sock(sock, "receive") + continue + else: + conn.last_active = now + self._debug_log("Conn session:%s conn:%d local recv len:%d pos:%d", + self.session.session_id, conn.conn_id, data_len, conn.received_position) + + conn.transfer_received_data(data) + elif event & selectors.EVENT_WRITE: + conn.blocked = False + conn.process_cmd() else: self.xlog.debug("no event for conn:%d", conn.conn_id) - data = "" - - data_len = len(data) - if data_len == 0: - self.xlog.debug("Conn session:%s conn:%d recv socket closed", self.session.session_id, conn.conn_id) - self.remove_sock(sock) - conn.transfer_peer_close("recv closed") - sock.close() - conn.do_stop(reason="recv closed.") - continue - - conn.last_active = now - # if self.session.config.show_debug: - # self.xlog.debug("Conn session:%s conn:%d local recv len:%d pos:%d", - # self.session.session_id, conn.conn_id, data_len, conn.received_position) - - conn.transfer_received_data(data) + self.close_sock(sock, "no_event") except Exception as e: xlog.exception("recv_worker e:%r", e) @@ -461,7 +495,7 @@ def recv_worker(self): except Exception as e: xlog.warn("unregister %s e:%r", sock, e) self.sock_conn_map = {} - self.xlog.debug("ConnectionReceiving stop") + self.xlog.debug("ConnectionPipe stop") self.th = None self.session.check_upload() @@ -472,28 +506,28 @@ def __init__(self, session, conn_id, sock, host, port, windows_size, windows_ack self.host = host self.port = port self.session = session - self.connection_receiver = session.connection_receiver self.conn_id = conn_id self.sock = sock self.windows_size = windows_size self.windows_ack = windows_ack self.is_client = is_client + self.connection_pipe = session.connection_pipe + self.xlog = xlog + self.cmd_queue = {} - self.cmd_notice = threading.Condition() - self.recv_notice = threading.Condition() self.running = True + self.blocked = False + self.send_buffer = b"" self.received_position = 0 self.remote_acked_position = 0 self.sended_position = 0 - self.sended_window_position = 0 - self.recv_thread = None - self.cmd_thread = None - self.xlog = xlog + self.sent_window_position = 0 self.create_time = time.time() self.last_active = time.time() + self._lock = threading.Lock() - self.transfered_close_to_peer = False + self.transferred_close_to_peer = False if sock: self.next_cmd_seq = 1 else: @@ -503,26 +537,19 @@ def __init__(self, session, conn_id, sock, host, port, windows_size, windows_ack def start(self, block): if self.sock: - self.connection_receiver.add_sock(self.sock, self) - - if block: - self.cmd_thread = None - self.cmd_processor() - else: - self.cmd_thread = threading.Thread(target=self.cmd_processor, - name="cmd_processor_%s:%d" % (self.host, self.port)) - self.cmd_thread.start() + self.connection_pipe.add_sock_event(self.sock, self, selectors.EVENT_READ) def status(self): out_string = "Conn[%d]: %s:%d
\r\n" % (self.conn_id, self.host, self.port) out_string += " received_position:%d/ Ack:%d
\n" % (self.received_position, self.remote_acked_position) - out_string += " sended_position:%d/ win:%d
\n" % (self.sended_position, self.sended_window_position) + out_string += " sended_position:%d/ win:%d
\n" % (self.sended_position, self.sent_window_position) out_string += " next_cmd_seq:%d
\n" % self.next_cmd_seq out_string += " next_recv_seq:%d
\n" % self.next_recv_seq out_string += " status: running:%r
\n" % self.running - out_string += " cmd_thread:%r
\n" % self.cmd_thread - out_string += " recv_thread:%r
\n" % self.recv_thread - out_string += " transfered_close_to_peer:%r
\n" % self.transfered_close_to_peer + out_string += " blocked: %s
\n" % self.blocked + if self.send_buffer: + out_string += " send_buffer: %d
\n" % len(self.send_buffer) + out_string += " transferred_close_to_peer:%r
\n" % self.transferred_close_to_peer out_string += " sock:%r
\n" % (self.sock is not None) out_string += " cmd_queue.len:%d
" % len(self.cmd_queue) out_string += " create time: %s
" % datetime.fromtimestamp(self.create_time).strftime('%Y-%m-%d %H:%M:%S.%f') @@ -541,20 +568,16 @@ def do_stop(self, reason="unknown"): self.xlog.debug("Conn session:%s conn:%d stop:%s", utils.to_str(self.session.session_id), self.conn_id, reason) self.running = False - self.cmd_notice.acquire() - self.cmd_notice.notify() - self.cmd_notice.release() - - self.connection_receiver.remove_sock(self.sock) - - if self.cmd_thread: - self.cmd_thread.join() - self.cmd_thread = None + self.connection_pipe.remove_sock(self.sock) self.cmd_queue = {} if self.sock is not None: - self.sock.close() + if self.sock.fileno() != -1: + try: + self.sock.close() + except: + pass self.sock = None # self.xlog.debug("Conn session:%s conn:%d stopped", self.session.session_id, self.conn_id) @@ -602,53 +625,64 @@ def do_connect(self, host, port): return e, False def put_cmd_data(self, data): - with self.cmd_notice: - seq = struct.unpack(" self.remote_acked_position: self.remote_acked_position = position - self.connection_receiver.add_sock(self.sock, self) + self.connection_pipe.add_sock_event(self.sock, self, selectors.EVENT_READ) elif cmd_id == 2: # Closed dat = data.get() @@ -689,76 +723,84 @@ def cmd_processor(self): self.xlog.info("Conn session:%s conn:%d %s:%d", self.session.session_id, self.conn_id, self.host, self.port) self.sock = sock - self.connection_receiver.add_sock(self.sock, self) + self.connection_pipe.add_sock_event(self.sock, self, selectors.EVENT_READ) else: self.xlog.error("Conn session:%s conn:%d unknown cmd_id:%d", self.session.session_id, self.conn_id, cmd_id) raise Exception("put_send_data unknown cmd_id:%d" % cmd_id) def send_to_sock(self, data): - # self.xlog.debug("Conn send_to_sock conn:%d len:%d", self.conn_id, len(data)) + # return True when not blocked, can send more data + + self._debug_log("Conn send_to_sock conn:%d len:%d", self.conn_id, len(data)) sock = self.sock if not sock: - return + return False payload_len = len(data) - buf = data.buf - start = data.begin - end = data.begin + payload_len + start = 0 + end = payload_len while start < end: send_size = min(end - start, 65535) try: - sended = sock.send(buf[start:start + send_size]) + sended = sock.send(data[start:start + send_size]) except Exception as e: self.xlog.info("%s conn:%d send closed: %r", self.session.session_id, self.conn_id, e) - sock.close() - self.sock = None if self.is_client: self.do_stop(reason="send fail.") - return + return False start += sended - self.sended_position += payload_len - if self.sended_position - self.sended_window_position > self.windows_ack: - self.sended_window_position = self.sended_position + if sended == 0: + self.connection_pipe.add_sock_event(sock, self, selectors.EVENT_WRITE) + self.send_buffer = data[start:] + self.blocked = True + break + + if start == end: + self.send_buffer = None + + self.sended_position += start + if self.sended_position - self.sent_window_position > self.windows_ack: + self.sent_window_position = self.sended_position self.transfer_ack(self.sended_position) - # self.xlog.debug("Conn:%d ack:%d", self.conn_id, self.sended_window_position) + self._debug_log("Conn:%d ack:%d", self.conn_id, self.sent_window_position) + + return not self.blocked def transfer_peer_close(self, reason=""): - with self.recv_notice: - if self.transfered_close_to_peer: - return - self.transfered_close_to_peer = True + if self.transferred_close_to_peer: + return + + self.transferred_close_to_peer = True - cmd = struct.pack(" self.remote_acked_position + self.windows_size: - self.xlog.debug("Conn session:%s conn:%d recv blocked, rcv:%d, ack:%d", self.session.session_id, - self.conn_id, self.received_position, self.remote_acked_position) - self.connection_receiver.remove_sock(self.sock) + if self.received_position > self.remote_acked_position + self.windows_size: + self.xlog.debug("Conn session:%s conn:%d recv blocked, rcv:%d, ack:%d", self.session.session_id, + self.conn_id, self.received_position, self.remote_acked_position) + self.connection_pipe.remove_sock_event(self.sock, selectors.EVENT_READ) def transfer_ack(self, position): - with self.recv_notice: - if self.transfered_close_to_peer: - return + if self.transferred_close_to_peer: + return - cmd_position = struct.pack(" 2 and sys.argv[1] == "-f": - config_path = sys.argv[2] - else: - config_path = os.path.join(data_xtunnel_path, 'client.json') - - xlog.info("use config_path:%s", config_path) - - config = xconfig.Config(config_path) - - config.set_var("log_level", "DEBUG") - config.set_var("upload_logs", True) - config.set_var("write_log_file", 0) - config.set_var("save_start_log", 1500) - config.set_var("show_debug", 0) - - config.set_var("encrypt_data", 0) - config.set_var("encrypt_password", "encrypt_pass") - config.set_var("encrypt_method", "aes-256-cfb") - - config.set_var("api_server", "center.xx-net.org") - config.set_var("scan_servers", ["scan1"]) - config.set_var("server_host", "") - config.set_var("server_port", 443) - config.set_var("use_https", 1) - config.set_var("port_range", 1) - - config.set_var("login_account", "") - config.set_var("login_password", "") - - config.set_var("conn_life", 30) - - config.set_var("socks_host", "127.0.0.1") - config.set_var("socks_port", 1080) - config.set_var("update_cloudflare_domains", True) - - # performance parameters - # range 2 - 100 - config.set_var("concurent_thread_num", 20) - - # min roundtrip on road if connectoin exist - config.set_var("min_on_road", 3) - - # range 1 - 1000, ms - config.set_var("send_delay", 10) - - # range 1 - 20000, ms - config.set_var("resend_timeout", 5000) - - # range 1 - resend_timeout, ms - config.set_var("ack_delay", 300) - - # max 10M - config.set_var("max_payload", 256 * 1024) - - # range 1 - 30 - config.set_var("roundtrip_timeout", 25) - - config.set_var("network_timeout", 10) - - config.set_var("windows_size", 10 * 1024 * 1024) # will recalulate based on: max_payload * concurent_thread_num *2 - - # reporter - config.set_var("timeout_threshold", 2) - config.set_var("report_interval", 5 * 60) - - config.set_var("enable_gae_proxy", 0) - config.set_var("enable_cloudflare", 1) - config.set_var("enable_cloudfront", 0) - config.set_var("enable_heroku", 0) - config.set_var("enable_tls_relay", 1) - config.set_var("enable_direct", 0) - - config.load() - - config.windows_ack = 0.05 * config.windows_size - config.windows_size = config.max_payload * config.concurent_thread_num * 2 - xlog.info("X-Tunnel window:%d", config.windows_size) - - if config.write_log_file: - xlog.log_to_file(os.path.join(data_path, "client.log")) - - xlog.setLevel(config.log_level) - xlog.set_buffer(200) - xlog.save_start_log = config.save_start_log - g.config = config - - def start(args): global ready @@ -165,7 +77,7 @@ def start(args): g.client_uuid = get_launcher_uuid() g.system = os_platform.platform + "|" + platform.version() + "|" + str(platform.architecture()) + "|" + sys.version - load_config() + g.config = config.load_config() front_dispatcher.init() g.data_path = data_path diff --git a/code/default/x_tunnel/local/config.py b/code/default/x_tunnel/local/config.py new file mode 100644 index 0000000000..0ed9fde264 --- /dev/null +++ b/code/default/x_tunnel/local/config.py @@ -0,0 +1,104 @@ +import sys +import os + +import env_info +data_path = env_info.data_path +data_xtunnel_path = os.path.join(data_path, 'x_tunnel') + +import xconfig +from xlog import getLogger +xlog = getLogger("x_tunnel") + + +def load_config(): + if len(sys.argv) > 2 and sys.argv[1] == "-f": + config_path = sys.argv[2] + else: + config_path = os.path.join(data_xtunnel_path, 'client.json') + + xlog.info("use config_path:%s", config_path) + + config = xconfig.Config(config_path) + + config.set_var("log_level", "DEBUG") + config.set_var("upload_logs", True) + config.set_var("write_log_file", 0) + config.set_var("save_start_log", 1500) + config.set_var("show_debug", 0) + + config.set_var("encrypt_data", 0) + config.set_var("encrypt_password", "encrypt_pass") + config.set_var("encrypt_method", "aes-256-cfb") + + config.set_var("api_server", "center.xx-net.org") + config.set_var("scan_servers", ["scan1"]) + config.set_var("server_host", "") + config.set_var("server_port", 443) + config.set_var("use_https", 1) + config.set_var("port_range", 1) + + config.set_var("login_account", "") + config.set_var("login_password", "") + + config.set_var("conn_life", 30) + + config.set_var("socks_host", "127.0.0.1") + config.set_var("socks_port", 1080) + config.set_var("update_cloudflare_domains", True) + + # performance parameters + # range 2 - 100 + config.set_var("concurent_thread_num", 20) + + # min roundtrip on road if connectoin exist + config.set_var("min_on_road", 3) + + # range 1 - 1000, ms + config.set_var("send_delay", 10) + + # range 1 - 20000, ms + config.set_var("resend_timeout", 5000) + + # range 1 - resend_timeout, ms + config.set_var("ack_delay", 300) + + # max 10M + config.set_var("max_payload", 256 * 1024) + + # range 1 - 30 + config.set_var("roundtrip_timeout", 25) + + config.set_var("network_timeout", 10) + + config.set_var("windows_size", 10 * 1024 * 1024) # will recalulate based on: max_payload * concurent_thread_num *2 + + # reporter + config.set_var("timeout_threshold", 2) + config.set_var("report_interval", 5 * 60) + + config.set_var("enable_gae_proxy", 0) + config.set_var("enable_cloudflare", 1) + config.set_var("enable_cloudfront", 0) + config.set_var("enable_heroku", 0) + config.set_var("enable_tls_relay", 1) + config.set_var("enable_direct", 0) + + config.load() + + config.windows_ack = 0.05 * config.windows_size + config.windows_size = config.max_payload * config.concurent_thread_num * 2 + xlog.info("X-Tunnel window:%d", config.windows_size) + + if "localhost" in config.server_host: + config.enable_cloudflare = 0 + config.enable_tls_relay = 0 + config.enable_direct = 1 + xlog.info("Only enable Direct front for localhost") + + if config.write_log_file: + xlog.log_to_file(os.path.join(data_path, "client.log")) + + xlog.setLevel(config.log_level) + xlog.set_buffer(200) + xlog.save_start_log = config.save_start_log + return config diff --git a/code/default/x_tunnel/local/direct_front.py b/code/default/x_tunnel/local/direct_front.py index 3c6fc6c58f..327c1f967f 100644 --- a/code/default/x_tunnel/local/direct_front.py +++ b/code/default/x_tunnel/local/direct_front.py @@ -20,6 +20,9 @@ def init(): class FakeWorker(object): + def __init__(self): + self.ip_str = "127.0.0.1" + def update_debug_data(self, rtt, send_data_len, dlen, speed): pass diff --git a/code/default/x_tunnel/local/proxy_session.py b/code/default/x_tunnel/local/proxy_session.py index 87cc9cc731..9179be766f 100644 --- a/code/default/x_tunnel/local/proxy_session.py +++ b/code/default/x_tunnel/local/proxy_session.py @@ -51,7 +51,7 @@ def __init__(self): self.wait_queue = base_container.WaitQueue() self.send_buffer = base_container.SendBuffer(max_payload=g.config.max_payload) self.receive_process = base_container.BlockReceivePool(self.download_data_processor) - self.connection_receiver = base_container.ConnectionReceiving(self, xlog) + self.connection_pipe = base_container.ConnectionPipe(self, xlog) self.lock = threading.Lock() # lock for conn_id, sn generation, on_road_num change, self.send_delay = g.config.send_delay / 1000.0 @@ -126,7 +126,7 @@ def start(self): self.round_trip_thread[i].daemon = True self.round_trip_thread[i].start() - self.connection_receiver.start() + self.connection_pipe.start() xlog.info("session started.") return True @@ -147,7 +147,7 @@ def stop(self): self.send_buffer.reset() self.receive_process.reset() self.wait_queue.stop() - self.connection_receiver.stop() + self.connection_pipe.stop() xlog.debug("session stopped.") @@ -314,8 +314,17 @@ def status(self): return out_string + @staticmethod + def get_login_extra_info(): + data = { + "version": g.xxnet_version, + "system": g.system, + "device": g.client_uuid + } + return json.dumps(data) + def login_session(self): - if len(g.server_host) == 0: + if not g.server_host or len(g.server_host) == 0: return False start_time = time.time() @@ -329,6 +338,8 @@ def login_session(self): int(g.config.windows_ack), g.config.resend_timeout, g.config.ack_delay) upload_data_head += struct.pack("