diff --git a/usbkill/usbkill.py b/usbkill/usbkill.py index 1b85d26..57469f2 100644 --- a/usbkill/usbkill.py +++ b/usbkill/usbkill.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# _ _ _ _ _ +# _ _ _ _ _ # | | | | (_) | | # _ _ ___| |__ | | _ _| | | # | | | |/___) _ \| |_/ ) | | | @@ -41,7 +41,13 @@ import plistlib # We compile this function beforehand for efficiency. -DEVICE_RE = [ re.compile(".+ID\s(?P\w+:\w+)"), re.compile("0x([0-9a-z]{4})") ] +# Element 0 = REGEX parsed output of lsusb +# Element 1 = REGEX parsed hex USB ID (used in both lsusb) +# Element 2 = REGEX parsed usbconfig vendor id, expressed as a tuple +# Element 3 = REGEX parsed usbconfig product id, expressed as a tuple +# There probably is a better way to compile this so it shows up as substring group, but regex makes me wanna kms +# Doing it the proper way would enable reading other attributes (like serial number) correctly +DEVICE_RE = [ re.compile(".+ID\s(?P\w+:\w+)"), re.compile("0x([0-9a-z]{4})"), re.compile("(idVendor)\s=\s0x(\w+)"), re.compile("(idProduct)\s=\s0x(\w+)") ] # Set the settings filename here SETTINGS_FILE = '/etc/usbkill.ini' @@ -92,20 +98,22 @@ def __add__(self, other): def log(settings, msg): log_file = settings['log_file'] - + contents = '\n{0} {1}\nCurrent state:\n'.format(str(datetime.now()), msg) with open(log_file, 'a+') as log: log.write(contents) - + # Log current USB state if CURRENT_PLATFORM.startswith("DARWIN"): os.system("system_profiler SPUSBDataType >> " + log_file) + elif ("freenas" in platform.uname()[3]): + os.system("usbconfig >>" + log_file) else: os.system("lsusb >> " + log_file) def shred(settings): shredder = settings['remove_file_cmd'] - + # List logs and settings to be removed if settings['melt_usbkill']: settings['folders_to_remove'].append(os.path.dirname(settings['log_file'])) @@ -116,23 +124,23 @@ def shred(settings): else: settings['files_to_remove'].append(os.path.realpath(__file__)) settings['files_to_remove'].append(usbkill_folder + "/usbkill.ini") - + # Remove files and folders for _file in settings['files_to_remove'] + settings['folders_to_remove']: os.system(shredder + _file ) - + def kill_computer(settings): # Log what is happening: if not settings['melt_usbkill']: # No need to spend time on logging if logs will be removed log(settings, "Detected a USB change. Dumping the list of connected devices and killing the computer...") - + # Shred as specified in settings shred(settings) - + # Execute kill commands in order. for command in settings['kill_commands']: os.system(command) - + if settings['do_sync']: # Sync the filesystem to save recent changes os.system("sync") @@ -140,7 +148,7 @@ def kill_computer(settings): # If syncing is risky because it might take too long, then sleep for 5ms. # This will still allow for syncing in most cases. sleep(0.05) - + # Wipe ram and/or swap if settings['do_wipe_ram'] and settings['do_wipe_swap']: os.system(settings['wipe_ram_cmd'] + " & " + settings['wipe_swap_cmd']) @@ -148,7 +156,7 @@ def kill_computer(settings): os.system(settings['wipe_ram_cmd']) elif settings['do_wipe_swap']: os.system(settings['wipe_swap_cmd']) - + if settings['shut_down']: # (Use argument --no-shut-down to prevent a shutdown.) # Finally poweroff computer immediately if CURRENT_PLATFORM.startswith("DARWIN"): @@ -160,7 +168,7 @@ def kill_computer(settings): else: # Linux-based systems - Will shutdown os.system("poweroff -f") - + # Exit the process to prevent executing twice (or more) all commands sys.exit(0) @@ -180,7 +188,7 @@ def check_inside(result, devices): try: result["Built-in_Device"] except KeyError: - + # Check if vendor_id/product_id is available for this one try: # Ensure vendor_id and product_id are present @@ -209,20 +217,32 @@ def check_inside(result, devices): for result_deep in result["_items"]: # Check what's inside the _items array check_inside(result_deep, devices) - + except KeyError: {} - + # Run the loop devices = [] for result in df[0]["_items"]: check_inside(result, devices) return devices - + +def lsusb_freenas(): + #Use the 'usbconfig' utility that is included in Freenas + vid = DEVICE_RE[2].findall(subprocess.check_output("usbconfig dump_device_desc", shell=True).decode('utf-8').strip()) + pid = DEVICE_RE[3].findall(subprocess.check_output("usbconfig dump_device_desc", shell=True).decode('utf-8').strip()) + devices = [] + for listing in range(len(vid)): + devices.append(vid[listing][1] + ":" + pid[listing][1]) + return devices + def lsusb(): # A Python version of the command 'lsusb' that returns a list of connected usbids if CURRENT_PLATFORM.startswith("DARWIN"): # Use OS X system_profiler (native, 60% faster, and doesn't need the lsusb port) return DeviceCountSet(lsusb_darwin()) + elif ("freenas" in platform.uname()[3]): + # Freenas does not ship with lsusb. Use usbconfig instead. + return DeviceCountSet(lsusb_freenas()) else: # Use lsusb on linux and bsd return DeviceCountSet(DEVICE_RE[0].findall(subprocess.check_output("lsusb", shell=True).decode('utf-8').strip())) @@ -230,9 +250,9 @@ def lsusb(): def program_present(program): if sys.version_info[0] == 3: # Python3 - from shutil import which + from shutil import which return which(program) != None - + else: """ Test if an executable exist in Python2 @@ -251,7 +271,7 @@ def is_exe(fpath): if is_exe(exe_file): return True return False - + def load_settings(filename): # Libraries that are only needed in this function: from json import loads as jsonloads @@ -287,7 +307,7 @@ def get_setting(name, gtype=''): # Read all lines of settings file config.read(filename) - + # Build settings settings = dict({ 'sleep_time' : get_setting('sleep', 'FLOAT'), @@ -300,26 +320,26 @@ def get_setting(name, gtype=''): 'do_sync' : get_setting('do_sync', 'BOOL'), 'kill_commands': jsonloads(get_setting('kill_commands').strip()) }) - + settings['do_wipe_ram'] = False if get_setting('do_wipe_ram', 'BOOL'): settings['do_wipe_ram'] = True settings['wipe_ram_cmd'] = get_setting('wipe_ram_cmd') + " " - + settings['do_wipe_swap'] = False if get_setting('do_wipe_swap', 'BOOL'): settings['do_wipe_swap'] = True settings['wipe_swap_cmd'] = get_setting('wipe_swap_cmd') + " " return settings - + def loop(settings): # Main loop that checks every 'sleep_time' seconds if computer should be killed. # Allows only whitelisted usb devices to connect! # Does not allow usb device that was present during program start to disconnect! start_devices = lsusb() acceptable_devices = start_devices + settings['whitelist'] - + # Write to logs that loop is starting: msg = "[INFO] Started patrolling the USB ports every " + str(settings['sleep_time']) + " seconds..." log(settings, msg) @@ -331,7 +351,7 @@ def loop(settings): current_devices = lsusb() # Check that all current devices are in the set of acceptable devices - # and their cardinality is less than or equal to what is allowed + # and their cardinality is less than or equal to what is allowed for device, count in current_devices.items(): if device not in acceptable_devices: # A device with unknown usbid detected @@ -341,7 +361,7 @@ def loop(settings): kill_computer(settings) # Check that all start devices are still present in current devices - # and their cardinality still the same + # and their cardinality still the same for device, count in start_devices.items(): if device not in current_devices: # A usbid has disappeared completely @@ -349,7 +369,7 @@ def loop(settings): if count > current_devices[device]: # Count of a usbid device is lower than at program start (not enough devices for given usbid) kill_computer(settings) - + sleep(settings['sleep_time']) def startup_checks(): @@ -363,26 +383,26 @@ def startup_checks(): # Check arguments args = sys.argv[1:] - - # Check for help + + # Check for help if '-h' in args or '--help' in args: sys.exit(help_message) - + if '--version' in args: print('usbkill', __version__) sys.exit(0) - + copy_settings = False if '--cs' in args: args.remove('--cs') copy_settings = True - + shut_down = True if '--no-shut-down' in args: print("[NOTICE] Ready to execute all the (potentially destructive) commands, but NOT shut down the computer.") args.remove('--no-shut-down') shut_down = False - + # Check all other args if len(args) > 0: sys.exit("\n[ERROR] Argument not understood. Can only understand -h\n") @@ -408,17 +428,17 @@ def startup_checks(): sys.exit("\n[ERROR] You have lost your settings file. Get a new copy of the usbkill.ini and place it in /etc/ or in " + sources_path + "/\n") print("[NOTICE] Copying install/setting.ini to " + SETTINGS_FILE ) os.system("cp " + sources_path + "install/usbkill.ini " + SETTINGS_FILE) - + # Load settings settings = load_settings(SETTINGS_FILE) settings['shut_down'] = shut_down - + # Make sure no spaces a present in paths to be wiped. for name in settings['folders_to_remove'] + settings['files_to_remove']: if ' ' in name: msg += "[ERROR][WARNING] '" + name + "'as specified in your usbkill.ini contains a space.\n" sys.exit(msg) - + # Make sure srm is present if it will be used. if settings['melt_usbkill'] or len(settings['folders_to_remove'] + settings['files_to_remove']) > 0: if not program_present('srm'): @@ -437,32 +457,30 @@ def startup_checks(): sys.exit("[ERROR] usbkill configured to destroy data, but srm not installed.\n") if not settings['wipe_swap_cmd'].startswith('sswap'): sys.exit("[ERROR] wipe_swap_cmd should start with `sswap'. sswap should be used for automated data overwrite.\n") - + # Make sure there is a logging folder log_folder = os.path.dirname(settings['log_file']) if not os.path.isdir(log_folder): os.mkdir(log_folder) - + return settings def go(): # Run startup checks and load settings settings = startup_checks() - + # Define exit handler now that settings are loaded... def exit_handler(signum, frame): print("\n[INFO] Exiting because exit signal was received\n") log(settings, "[INFO] Exiting because exit signal was received") sys.exit(0) - + # Register handlers for clean exit of program for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, ]: signal.signal(sig, exit_handler) - + # Start main loop loop(settings) - + if __name__=="__main__": go() - -