From e6e74ae0035b5f0b68956843b509e03c4ccf6ea0 Mon Sep 17 00:00:00 2001 From: Mohammad Shafiee <51268970+wikm360@users.noreply.github.com> Date: Thu, 30 May 2024 12:23:55 +0330 Subject: [PATCH] add huawei detection , add ulr list per user , add most ulr used per user , fix bug and improve --- .gitignore | 2 +- base.py | 279 ++++++++++++++++++++++++++++++++--------------------- 2 files changed, 169 insertions(+), 112 deletions(-) diff --git a/.gitignore b/.gitignore index 878cbc9..b6133c3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -Design.txt +design.txt diff --git a/base.py b/base.py index f012cca..9de7cd9 100644 --- a/base.py +++ b/base.py @@ -8,6 +8,7 @@ import requests import shutil import psutil +from collections import Counter CPU_THRESHOLD = cpu_threshold RAM_THRESHOLD = ram_threshold @@ -63,6 +64,7 @@ def send_single_file (file_path) : print(response.text) def analize () : + url_user_list = ["default"] user_list = {"default":"0"} user_phone = {"default" : ["0" , "1"]} inbound_user = ["default"] @@ -76,119 +78,171 @@ def analize () : pattern = r"email: (\S+)" #if user in line : if re.findall(pattern, line) : - user = re.findall(pattern, line)[0] - user = user.split(".")[1].split("\n")[0] - line = line.split(" ") - - for pice in line : - line_str += " " + pice - - if line[2] == "DNS" : - continue - if user not in user_list : - user_list[user] = 0 - if user not in user_list : - with open (f"{path_user}{user}.txt" , "w") as user_log : - user_log.writelines(line_str) - else : - with open (f"{path_user}{user}.txt" , "a") as user_log : - user_log.writelines(line_str) - user_list[user] = line[0] + " " + line[1] - count += 1 - - - - #porn detection : - pattern_porn = r"\b\w*\s*porn\s*\w*\b" - if re.findall(pattern_porn, line_str): - with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : - file.writelines(line_str) - if user not in p_user : - - p_user.append(user) - pattern_porn = r"\b\w*\s*xnxx\s*\w*\b" - if re.findall(pattern_porn, line_str): - with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : - file.writelines(line_str) - if user not in p_user : - - p_user.append(user) - pattern_porn = r"\b\w*\s*xvideos\s*\w*\b" - if re.findall(pattern_porn, line_str): - with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : - file.writelines(line_str) - if user not in p_user : - - p_user.append(user) - pattern_porn = r"\b\w*\s*sex\s*\w*\b" - if re.findall(pattern_porn, line_str): - with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : - file.writelines(line_str) - if user not in p_user : - - p_user.append(user) - - # phone detection : - xiaomi_pattern = r"\b\w*\s*xiaomi\s*\w*\b" - samsung_pattern = r"\b\w*\s*samsung\s*\w*\b" - apple_pattern = r"\b\w*\s*gsp\s*\w*\b" - if re.findall(xiaomi_pattern, line_str): - if user not in user_phone : - user_phone[f"{user}"] = ["0"] - if "xiaomi" not in user_phone[f"{user}"] : - user_phone[f"{user}"].append("xiaomi") - - if re.findall(samsung_pattern, line_str): - if user not in user_phone : - user_phone[f"{user}"] = ["0"] - if "samsung" not in user_phone[f"{user}"] : - user_phone[f"{user}"].append("samsung") - - if re.findall(apple_pattern, line_str): - if user not in user_phone : - user_phone[f"{user}"] = ["0"] - if "apple" not in user_phone[f"{user}"] : - user_phone[f"{user}"].append("apple") - - # specific inbound detector : - inbound_pattern = re.search(r"VMESS\s+\+\s+TCP", line_str, flags=re.IGNORECASE) - if inbound_pattern: - if user not in inbound_user : - inbound_user.append(user) - - print(count) - - # port scan detection : - # ip_port = line[2] - # ip = ip_port.split(":")[0] - # port = ip_port.split(":")[1] - # if ip == before_ip : - # if port != before_port : - # file_path = f"{path_user}port_scan_detection.txt" - # with open(file_path , "a") as file : - # file.writelines(line_str) - - line_str = " " + if "1.1.1.1" not in line : + if "mtalk.google.com" not in line : + if "android.apis.google.com" not in line : + if "dns.google" not in line : + if "8.8.8.8" not in line : + if "gstatic" not in line : + user = re.findall(pattern, line)[0] + user = user.split(".")[1].split("\n")[0] + line = line.split(" ") + + for pice in line : + line_str += " " + pice + + if line[2] == "DNS" : + continue + if user not in user_list : + user_list[user] = 0 + if user not in user_list : + with open (f"{path_user}{user}.txt" , "w") as user_log : + user_log.writelines(line_str) + else : + with open (f"{path_user}{user}.txt" , "a") as user_log : + user_log.writelines(line_str) + user_list[user] = line[0] + " " + line[1] + count += 1 + #create url list request per user: + if "[" in line : + url = line[4].split("[")[1].split("]")[0] + else : + url = str(line[4].split(":")[1]) + #print(url) + if user not in url_user_list : + with open (f"{path_user}{user}_url.txt" , "w") as file : + file.writelines("default") + url_user_list.append(user) + + else : + with open (f"{path_user}{user}_url.txt" , "r") as file : + with open (f"{path_user}{user}_url.txt" , "a") as file_2 : + for line_url in file : + if url in line_url : + flag = True + else : + flag = False + if flag == False: + file_2.writelines("\n") + file_2.writelines(url) + + + #porn detection : + pattern_porn = r"\b\w*\s*porn\s*\w*\b" + if re.findall(pattern_porn, line_str): + with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : + file.writelines(line_str) + if user not in p_user : + + p_user.append(user) + pattern_porn = r"\b\w*\s*xnxx\s*\w*\b" + if re.findall(pattern_porn, line_str): + with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : + file.writelines(line_str) + if user not in p_user : + + p_user.append(user) + pattern_porn = r"\b\w*\s*xvideos\s*\w*\b" + if re.findall(pattern_porn, line_str): + with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : + file.writelines(line_str) + if user not in p_user : + + p_user.append(user) + pattern_porn = r"\b\w*\s*sex\s*\w*\b" + if re.findall(pattern_porn, line_str): + with open (f"{path}porn_detection.txt" , "a" , encoding="utf-8") as file : + file.writelines(line_str) + if user not in p_user : + + p_user.append(user) + + # phone detection : + xiaomi_pattern = r"\b\w*\s*xiaomi\s*\w*\b" + samsung_pattern = r"\b\w*\s*samsung\s*\w*\b" + apple_pattern = r"\b\w*\s*gsp\s*\w*\b" + huawei_pattern = r"\b\w*\s*grs.dbankcloud\s*\w*\b" + if re.findall(xiaomi_pattern, line_str): + if user not in user_phone : + user_phone[f"{user}"] = ["0"] + if "xiaomi" not in user_phone[f"{user}"] : + user_phone[f"{user}"].append("xiaomi") + + if re.findall(samsung_pattern, line_str): + if user not in user_phone : + user_phone[f"{user}"] = ["0"] + if "samsung" not in user_phone[f"{user}"] : + user_phone[f"{user}"].append("samsung") + + if re.findall(apple_pattern, line_str): + if user not in user_phone : + user_phone[f"{user}"] = ["0"] + if "apple" not in user_phone[f"{user}"] : + user_phone[f"{user}"].append("apple") + + if re.findall(huawei_pattern, line_str): + if user not in user_phone : + user_phone[f"{user}"] = ["0"] + if "huawei" not in user_phone[f"{user}"] : + user_phone[f"{user}"].append("huawei") + + # specific inbound detector : + inbound_pattern = re.search(r"VMESS\s+\+\s+TCP", line_str, flags=re.IGNORECASE) + if inbound_pattern: + if user not in inbound_user : + inbound_user.append(user) + + print(count) + + # port scan detection : + # ip_port = line[2] + # ip = ip_port.split(":")[0] + # port = ip_port.split(":")[1] + # if ip == before_ip : + # if port != before_port : + # file_path = f"{path_user}port_scan_detection.txt" + # with open(file_path , "a") as file : + # file.writelines(line_str) + + line_str = " " - file_path = f"{path}last_online_per_user.txt" - json_data = json.dumps(user_list) - p_data = json.dumps(p_user) - phone_data = json.dumps(user_phone) - inbound_data = json.dumps(inbound_user) - with open (file_path , "w") as file : - file.writelines(json_data) - with open (f"{path}p_user.txt" , "w" , encoding="utf-8") as file : - file.writelines(p_data) - with open (f"{path}phone_user.txt" , "w" , encoding="utf-8") as file : - file.writelines(phone_data) - with open (f"{path}inbound_specific.txt" , "w" , encoding="utf-8") as file : - file.writelines(inbound_data) - - print(user_list) - - send_def() + file_path = f"{path}last_online_per_user.txt" + json_data = json.dumps(user_list) + p_data = json.dumps(p_user) + phone_data = json.dumps(user_phone) + inbound_data = json.dumps(inbound_user) + with open (file_path , "w") as file : + file.writelines(json_data) + with open (f"{path}p_user.txt" , "w" , encoding="utf-8") as file : + file.writelines(p_data) + with open (f"{path}phone_user.txt" , "w" , encoding="utf-8") as file : + file.writelines(phone_data) + with open (f"{path}inbound_specific.txt" , "w" , encoding="utf-8") as file : + file.writelines(inbound_data) + + print(user_list) + + #mos used url per user : + for u in url_user_list : + if u == "default" : + continue + with open(f"./user/{u}_url.txt", "r") as f: + # Read the file content + content = f.read() + # Convert text to lowercase and split into urls + urls = content.lower().split("\n") + # Create a Counter object to count urls frequency + url_count = Counter(urls) + # Find the most common url and its count + most_used_word, count = url_count.most_common(1)[0] + # Print the most used url + mess = f"The most used URL is '{most_used_word}' (found {count} times) for {u}." + print(mess) + send_telegram_message(mess) + + send_def() def send_def () : source_dir = path_user @@ -220,8 +274,10 @@ def send_def () : file_path = './p_user.txt' send_single_file(file_path) - file_path = "./access.log" + file_path = path_log send_single_file(file_path) + + time.sleep(15) clear_def() @@ -246,6 +302,7 @@ def clear_def() : # فایل اصلی لاگ کپی شده اینجا هم پاک بشه delete_file("./access.log") delete_file("./user.zip") + send_telegram_message("Done...Created by @wikm360 with ❤️ ....") def main() :