From 7c9dfa34a07e81272a723f2ed1e9dd0ad4421b95 Mon Sep 17 00:00:00 2001 From: Avinash Reddy Date: Wed, 1 Nov 2023 18:03:51 +0530 Subject: [PATCH] touched... - Realtime sync from device (https://github.com/frappe/biometric-attendance-sync-tool/pull/55) - fix: exit on interrupt (https://github.com/frappe/biometric-attendance-sync-tool/pull/54) - strip out GUI stuff, remove PyQT5 from dependencies - formatted code Co-Authored by: Dany Robert --- erpnext_sync.py | 516 +++++++++++++++++++++++++++++++-------- gui.py | 391 ----------------------------- install.py | 7 - local_config.py.template | 46 ++-- requirements.txt | 1 - 5 files changed, 437 insertions(+), 524 deletions(-) delete mode 100755 gui.py delete mode 100755 install.py diff --git a/erpnext_sync.py b/erpnext_sync.py index 6201047..0d1b076 100644 --- a/erpnext_sync.py +++ b/erpnext_sync.py @@ -1,34 +1,46 @@ - +#!/usr/bin/env python import local_config as config import requests +import signal import datetime import json import os import sys -import time import logging from logging.handlers import RotatingFileHandler import pickledb from zk import ZK, const -EMPLOYEE_NOT_FOUND_ERROR_MESSAGE = "No Employee found for the given employee field value" -EMPLOYEE_INACTIVE_ERROR_MESSAGE = "Transactions cannot be created for an Inactive Employee" -DUPLICATE_EMPLOYEE_CHECKIN_ERROR_MESSAGE = "This employee already has a log with the same timestamp" -allowlisted_errors = [EMPLOYEE_NOT_FOUND_ERROR_MESSAGE, EMPLOYEE_INACTIVE_ERROR_MESSAGE, DUPLICATE_EMPLOYEE_CHECKIN_ERROR_MESSAGE] +from threading import Event, Thread + +exit = Event() + +EMPLOYEE_NOT_FOUND_ERROR_MESSAGE = ( + "No Employee found for the given employee field value" +) +EMPLOYEE_INACTIVE_ERROR_MESSAGE = ( + "Transactions cannot be created for an Inactive Employee" +) +DUPLICATE_EMPLOYEE_CHECKIN_ERROR_MESSAGE = ( + "This employee already has a log with the same timestamp" +) +allowlisted_errors = [ + EMPLOYEE_NOT_FOUND_ERROR_MESSAGE, + EMPLOYEE_INACTIVE_ERROR_MESSAGE, + DUPLICATE_EMPLOYEE_CHECKIN_ERROR_MESSAGE, +] -if hasattr(config,'allowed_exceptions'): + +if hasattr(config, "allowed_exceptions"): allowlisted_errors_temp = [] for error_number in config.allowed_exceptions: - allowlisted_errors_temp.append(allowlisted_errors[error_number-1]) + allowlisted_errors_temp.append(allowlisted_errors[error_number - 1]) allowlisted_errors = allowlisted_errors_temp -device_punch_values_IN = getattr(config, 'device_punch_values_IN', [0,4]) -device_punch_values_OUT = getattr(config, 'device_punch_values_OUT', [1,5]) -ERPNEXT_VERSION = getattr(config, 'ERPNEXT_VERSION', 13) +device_punch_values_IN = getattr(config, "device_punch_values_IN", [0, 4]) +device_punch_values_OUT = getattr(config, "device_punch_values_OUT", [1, 5]) +ERPNEXT_VERSION = getattr(config, "ERPNEXT_VERSION", 13) -# possible area of further developemt - # Real-time events - setup getting events pushed from the machine rather then polling. - #- this is documented as 'Real-time events' in the ZKProtocol manual. # Notes: # Status Keys in status.json @@ -38,60 +50,110 @@ # - _push_timestamp # - _sync_timestamp + def main(): """Takes care of checking if it is time to pull data based on config, then calling the relevent functions to pull data and push to EPRNext. """ try: - last_lift_off_timestamp = _safe_convert_date(status.get('lift_off_timestamp'), "%Y-%m-%d %H:%M:%S.%f") - if (last_lift_off_timestamp and last_lift_off_timestamp < datetime.datetime.now() - datetime.timedelta(minutes=config.PULL_FREQUENCY)) or not last_lift_off_timestamp: - status.set('lift_off_timestamp', str(datetime.datetime.now())) + last_lift_off_timestamp = _safe_convert_date( + status.get("lift_off_timestamp"), "%Y-%m-%d %H:%M:%S.%f" + ) + if ( + last_lift_off_timestamp + and last_lift_off_timestamp + < datetime.datetime.now() + - datetime.timedelta(minutes=config.PULL_FREQUENCY) + ) or not last_lift_off_timestamp: + status.set("lift_off_timestamp", str(datetime.datetime.now())) info_logger.info("Cleared for lift off!") for device in config.devices: + if device.get("live_sync"): + status.set( + f'{device["device_id"]}_push_timestamp', + str(datetime.datetime.now()), + ) + continue + device_attendance_logs = None - info_logger.info("Processing Device: "+ device['device_id']) - dump_file = get_dump_file_name_and_directory(device['device_id'], device['ip']) + info_logger.info("Processing Device: " + device["device_id"]) + dump_file = get_dump_file_name_and_directory( + device["device_id"], device["ip"] + ) if os.path.exists(dump_file): - info_logger.error('Device Attendance Dump Found in Log Directory. This can mean the program crashed unexpectedly. Retrying with dumped data.') - with open(dump_file, 'r') as f: + info_logger.error( + "Device Attendance Dump Found in Log Directory. This can mean the program crashed unexpectedly. Retrying with dumped data." + ) + with open(dump_file, "r") as f: file_contents = f.read() if file_contents: - device_attendance_logs = list(map(lambda x: _apply_function_to_key(x, 'timestamp', datetime.datetime.fromtimestamp), json.loads(file_contents))) + device_attendance_logs = list( + map( + lambda x: _apply_function_to_key( + x, "timestamp", datetime.datetime.fromtimestamp + ), + json.loads(file_contents), + ) + ) try: pull_process_and_push_data(device, device_attendance_logs) - status.set(f'{device["device_id"]}_push_timestamp', str(datetime.datetime.now())) + status.set( + f'{device["device_id"]}_push_timestamp', + str(datetime.datetime.now()), + ) if os.path.exists(dump_file): os.remove(dump_file) - info_logger.info("Successfully processed Device: "+ device['device_id']) + info_logger.info( + "Successfully processed Device: " + device["device_id"] + ) except: - error_logger.exception('exception when calling pull_process_and_push_data function for device'+json.dumps(device, default=str)) - if hasattr(config,'shift_type_device_mapping'): + error_logger.exception( + "exception when calling pull_process_and_push_data function for device" + + json.dumps(device, default=str) + ) + if hasattr(config, "shift_type_device_mapping"): update_shift_last_sync_timestamp(config.shift_type_device_mapping) - status.set('mission_accomplished_timestamp', str(datetime.datetime.now())) + status.set("mission_accomplished_timestamp", str(datetime.datetime.now())) info_logger.info("Mission Accomplished!") except: - error_logger.exception('exception has occurred in the main function...') + error_logger.exception("exception has occurred in the main function...") def pull_process_and_push_data(device, device_attendance_logs=None): - """ Takes a single device config as param and pulls data from that device. + """Takes a single device config as param and pulls data from that device. params: device: a single device config object from the local_config file device_attendance_logs: fetching from device is skipped if this param is passed. used to restart failed fetches from previous runs. """ - attendance_success_log_file = '_'.join(["attendance_success_log", device['device_id']]) - attendance_failed_log_file = '_'.join(["attendance_failed_log", device['device_id']]) - attendance_success_logger = setup_logger(attendance_success_log_file, '/'.join([config.LOGS_DIRECTORY, attendance_success_log_file])+'.log') - attendance_failed_logger = setup_logger(attendance_failed_log_file, '/'.join([config.LOGS_DIRECTORY, attendance_failed_log_file])+'.log') + attendance_success_log_file = "_".join( + ["attendance_success_log", device["device_id"]] + ) + attendance_failed_log_file = "_".join( + ["attendance_failed_log", device["device_id"]] + ) + attendance_success_logger = setup_logger( + attendance_success_log_file, + "/".join([config.LOGS_DIRECTORY, attendance_success_log_file]) + ".log", + ) + attendance_failed_logger = setup_logger( + attendance_failed_log_file, + "/".join([config.LOGS_DIRECTORY, attendance_failed_log_file]) + ".log", + ) if not device_attendance_logs: - device_attendance_logs = get_all_attendance_from_device(device['ip'], device_id=device['device_id'], clear_from_device_on_fetch=device['clear_from_device_on_fetch']) + device_attendance_logs = get_all_attendance_from_device( + device["ip"], + device_id=device["device_id"], + clear_from_device_on_fetch=device["clear_from_device_on_fetch"], + ) if not device_attendance_logs: return # for finding the last successfull push and restart from that point (or) from a set 'config.IMPORT_START_DATE' (whichever is later) index_of_last = -1 - last_line = get_last_line_from_file('/'.join([config.LOGS_DIRECTORY, attendance_success_log_file])+'.log') + last_line = get_last_line_from_file( + "/".join([config.LOGS_DIRECTORY, attendance_success_log_file]) + ".log" + ) import_start_date = _safe_convert_date(config.IMPORT_START_DATE, "%Y%m%d") if last_line or import_start_date: last_user_id = None @@ -108,39 +170,150 @@ def pull_process_and_push_data(device, device_attendance_logs=None): last_timestamp = import_start_date for i, x in enumerate(device_attendance_logs): if last_user_id and last_timestamp: - if last_user_id == str(x['user_id']) and last_timestamp == x['timestamp']: + if ( + last_user_id == str(x["user_id"]) + and last_timestamp == x["timestamp"] + ): index_of_last = i break elif last_timestamp: - if x['timestamp'] >= last_timestamp: + if x["timestamp"] >= last_timestamp: index_of_last = i break - for device_attendance_log in device_attendance_logs[index_of_last+1:]: - punch_direction = device['punch_direction'] - if punch_direction == 'AUTO': - if device_attendance_log['punch'] in device_punch_values_OUT: - punch_direction = 'OUT' - elif device_attendance_log['punch'] in device_punch_values_IN: - punch_direction = 'IN' + for device_attendance_log in device_attendance_logs[index_of_last + 1 :]: + punch_direction = device["punch_direction"] + if punch_direction == "AUTO": + if device_attendance_log["punch"] in device_punch_values_OUT: + punch_direction = "OUT" + elif device_attendance_log["punch"] in device_punch_values_IN: + punch_direction = "IN" else: punch_direction = None - erpnext_status_code, erpnext_message = send_to_erpnext(device_attendance_log['user_id'], device_attendance_log['timestamp'], device['device_id'], punch_direction) + erpnext_status_code, erpnext_message = send_to_erpnext( + device_attendance_log["user_id"], + device_attendance_log["timestamp"], + device["device_id"], + punch_direction, + ) if erpnext_status_code == 200: - attendance_success_logger.info("\t".join([erpnext_message, str(device_attendance_log['uid']), - str(device_attendance_log['user_id']), str(device_attendance_log['timestamp'].timestamp()), - str(device_attendance_log['punch']), str(device_attendance_log['status']), - json.dumps(device_attendance_log, default=str)])) + attendance_success_logger.info( + "\t".join( + [ + erpnext_message, + str(device_attendance_log["uid"]), + str(device_attendance_log["user_id"]), + str(device_attendance_log["timestamp"].timestamp()), + str(device_attendance_log["punch"]), + str(device_attendance_log["status"]), + json.dumps(device_attendance_log, default=str), + ] + ) + ) else: - attendance_failed_logger.error("\t".join([str(erpnext_status_code), str(device_attendance_log['uid']), - str(device_attendance_log['user_id']), str(device_attendance_log['timestamp'].timestamp()), - str(device_attendance_log['punch']), str(device_attendance_log['status']), - json.dumps(device_attendance_log, default=str)])) - if not(any(error in erpnext_message for error in allowlisted_errors)): - raise Exception('API Call to ERPNext Failed.') + attendance_failed_logger.error( + "\t".join( + [ + str(erpnext_status_code), + str(device_attendance_log["uid"]), + str(device_attendance_log["user_id"]), + str(device_attendance_log["timestamp"].timestamp()), + str(device_attendance_log["punch"]), + str(device_attendance_log["status"]), + json.dumps(device_attendance_log, default=str), + ] + ) + ) + if not (any(error in erpnext_message for error in allowlisted_errors)): + raise Exception("API Call to ERPNext Failed.") + + +def live_sync_attendance(device): + live_zk = ZK(device["ip"], port=4370, timeout=30) + live_conn = None + + attendance_success_log_file = "_".join( + ["attendance_success_log", device["device_id"]] + ) + attendance_failed_log_file = "_".join( + ["attendance_failed_log", device["device_id"]] + ) + attendance_success_logger = setup_logger( + attendance_success_log_file, + "/".join([config.LOGS_DIRECTORY, attendance_success_log_file]) + ".log", + ) + attendance_failed_logger = setup_logger( + attendance_failed_log_file, + "/".join([config.LOGS_DIRECTORY, attendance_failed_log_file]) + ".log", + ) + + try: + print(f"[Live Sync]Connecting to {device['device_id']}") + live_conn = live_zk.connect() + print(f"[Live Sync]Connected to {device['device_id']}") + for attendance in live_conn.live_capture(): + if exit.is_set(): + break + + if attendance is None: + continue + + device_attendance_log = attendance.__dict__ + print(device_attendance_log) + punch_direction = device["punch_direction"] + if punch_direction == "AUTO": + if device_attendance_log["punch"] in device_punch_values_OUT: + punch_direction = "OUT" + elif device_attendance_log["punch"] in device_punch_values_IN: + punch_direction = "IN" + else: + punch_direction = None + erpnext_status_code, erpnext_message = send_to_erpnext( + device_attendance_log["user_id"], + device_attendance_log["timestamp"], + device["device_id"], + punch_direction, + ) + if erpnext_status_code == 200: + attendance_success_logger.info( + "\t".join( + [ + erpnext_message, + str(device_attendance_log["uid"]), + str(device_attendance_log["user_id"]), + str(device_attendance_log["timestamp"].timestamp()), + str(device_attendance_log["punch"]), + str(device_attendance_log["status"]), + json.dumps(device_attendance_log, default=str), + ] + ) + ) + else: + attendance_failed_logger.error( + "\t".join( + [ + str(erpnext_status_code), + str(device_attendance_log["uid"]), + str(device_attendance_log["user_id"]), + str(device_attendance_log["timestamp"].timestamp()), + str(device_attendance_log["punch"]), + str(device_attendance_log["status"]), + json.dumps(device_attendance_log, default=str), + ] + ) + ) + if not (any(error in erpnext_message for error in allowlisted_errors)): + raise Exception("API Call to ERPNext Failed.") + except Exception as e: + print("Process terminate : {}".format(e)) + finally: + if live_conn: + live_conn.disconnect() -def get_all_attendance_from_device(ip, port=4370, timeout=30, device_id=None, clear_from_device_on_fetch=False): +def get_all_attendance_from_device( + ip, port=4370, timeout=30, device_id=None, clear_from_device_on_fetch=False +): # Sample Attendance Logs [{'punch': 255, 'user_id': '22', 'uid': 12349, 'status': 1, 'timestamp': datetime.datetime(2019, 2, 26, 20, 31, 29)},{'punch': 255, 'user_id': '7', 'uid': 7, 'status': 1, 'timestamp': datetime.datetime(2019, 2, 26, 20, 31, 36)}] zk = ZK(ip, port=port, timeout=timeout) conn = None @@ -152,22 +325,29 @@ def get_all_attendance_from_device(ip, port=4370, timeout=30, device_id=None, cl info_logger.info("\t".join((ip, "Device Disable Attempted. Result:", str(x)))) attendances = conn.get_attendance() info_logger.info("\t".join((ip, "Attendances Fetched:", str(len(attendances))))) - status.set(f'{device_id}_push_timestamp', None) - status.set(f'{device_id}_pull_timestamp', str(datetime.datetime.now())) + status.set(f"{device_id}_push_timestamp", None) + status.set(f"{device_id}_pull_timestamp", str(datetime.datetime.now())) if len(attendances): # keeping a backup before clearing data incase the programs fails. # if everything goes well then this file is removed automatically at the end. dump_file_name = get_dump_file_name_and_directory(device_id, ip) - with open(dump_file_name, 'w+') as f: - f.write(json.dumps(list(map(lambda x: x.__dict__, attendances)), default=datetime.datetime.timestamp)) + with open(dump_file_name, "w+") as f: + f.write( + json.dumps( + list(map(lambda x: x.__dict__, attendances)), + default=datetime.datetime.timestamp, + ) + ) if clear_from_device_on_fetch: x = conn.clear_attendance() - info_logger.info("\t".join((ip, "Attendance Clear Attempted. Result:", str(x)))) + info_logger.info( + "\t".join((ip, "Attendance Clear Attempted. Result:", str(x))) + ) x = conn.enable_device() info_logger.info("\t".join((ip, "Device Enable Attempted. Result:", str(x)))) except: - error_logger.exception(str(ip)+' exception when fetching from device...') - raise Exception('Device fetch failed.') + error_logger.exception(str(ip) + " exception when fetching from device...") + raise Exception("Device fetch failed.") finally: if conn: conn.disconnect() @@ -181,27 +361,53 @@ def send_to_erpnext(employee_field_value, timestamp, device_id=None, log_type=No endpoint_app = "hrms" if ERPNEXT_VERSION > 13 else "erpnext" url = f"{config.ERPNEXT_URL}/api/method/{endpoint_app}.hr.doctype.employee_checkin.employee_checkin.add_log_based_on_employee_field" headers = { - 'Authorization': "token "+ config.ERPNEXT_API_KEY + ":" + config.ERPNEXT_API_SECRET, - 'Accept': 'application/json' + "Authorization": "token " + + config.ERPNEXT_API_KEY + + ":" + + config.ERPNEXT_API_SECRET, + "Accept": "application/json", } data = { - 'employee_field_value' : employee_field_value, - 'timestamp' : timestamp.__str__(), - 'device_id' : device_id, - 'log_type' : log_type + "employee_field_value": employee_field_value, + "timestamp": timestamp.__str__(), + "device_id": device_id, + "log_type": log_type, } response = requests.request("POST", url, headers=headers, data=data) if response.status_code == 200: - return 200, json.loads(response._content)['message']['name'] + return 200, json.loads(response._content)["message"]["name"] else: error_str = _safe_get_error_str(response) if EMPLOYEE_NOT_FOUND_ERROR_MESSAGE in error_str: - error_logger.error('\t'.join(['Error during ERPNext API Call.', str(employee_field_value), str(timestamp.timestamp()), str(device_id), str(log_type), error_str])) + error_logger.error( + "\t".join( + [ + "Error during ERPNext API Call.", + str(employee_field_value), + str(timestamp.timestamp()), + str(device_id), + str(log_type), + error_str, + ] + ) + ) # TODO: send email? else: - error_logger.error('\t'.join(['Error during ERPNext API Call.', str(employee_field_value), str(timestamp.timestamp()), str(device_id), str(log_type), error_str])) + error_logger.error( + "\t".join( + [ + "Error during ERPNext API Call.", + str(employee_field_value), + str(timestamp.timestamp()), + str(device_id), + str(log_type), + error_str, + ] + ) + ) return response.status_code, error_str + def update_shift_last_sync_timestamp(shift_type_device_mapping): """ ### algo for updating the sync_current_timestamp @@ -214,69 +420,116 @@ def update_shift_last_sync_timestamp(shift_type_device_mapping): for shift_type_device_map in shift_type_device_mapping: all_devices_pushed = True pull_timestamp_array = [] - for device_id in shift_type_device_map['related_device_id']: - if not status.get(f'{device_id}_push_timestamp'): + for device_id in shift_type_device_map["related_device_id"]: + if not status.get(f"{device_id}_push_timestamp"): all_devices_pushed = False break - pull_timestamp_array.append(_safe_convert_date(status.get(f'{device_id}_pull_timestamp'), "%Y-%m-%d %H:%M:%S.%f")) + pull_timestamp_array.append( + _safe_convert_date( + status.get(f"{device_id}_pull_timestamp"), "%Y-%m-%d %H:%M:%S.%f" + ) + ) if all_devices_pushed: min_pull_timestamp = min(pull_timestamp_array) - if isinstance(shift_type_device_map['shift_type_name'], str): # for backward compatibility of config file - shift_type_device_map['shift_type_name'] = [shift_type_device_map['shift_type_name']] - for shift in shift_type_device_map['shift_type_name']: + if isinstance( + shift_type_device_map["shift_type_name"], str + ): # for backward compatibility of config file + shift_type_device_map["shift_type_name"] = [ + shift_type_device_map["shift_type_name"] + ] + for shift in shift_type_device_map["shift_type_name"]: try: - sync_current_timestamp = _safe_convert_date(status.get(f'{shift}_sync_timestamp'), "%Y-%m-%d %H:%M:%S.%f") - if (sync_current_timestamp and min_pull_timestamp > sync_current_timestamp) or (min_pull_timestamp and not sync_current_timestamp): - response_code = send_shift_sync_to_erpnext(shift, min_pull_timestamp) + sync_current_timestamp = _safe_convert_date( + status.get(f"{shift}_sync_timestamp"), "%Y-%m-%d %H:%M:%S.%f" + ) + if ( + sync_current_timestamp + and min_pull_timestamp > sync_current_timestamp + ) or (min_pull_timestamp and not sync_current_timestamp): + response_code = send_shift_sync_to_erpnext( + shift, min_pull_timestamp + ) if response_code == 200: - status.set(f'{shift}_sync_timestamp', str(min_pull_timestamp)) + status.set( + f"{shift}_sync_timestamp", str(min_pull_timestamp) + ) except: - error_logger.exception('Exception in update_shift_last_sync_timestamp, for shift:'+shift) + error_logger.exception( + "Exception in update_shift_last_sync_timestamp, for shift:" + + shift + ) + def send_shift_sync_to_erpnext(shift_type_name, sync_timestamp): url = config.ERPNEXT_URL + "/api/resource/Shift Type/" + shift_type_name headers = { - 'Authorization': "token "+ config.ERPNEXT_API_KEY + ":" + config.ERPNEXT_API_SECRET, - 'Accept': 'application/json' - } - data = { - "last_sync_of_checkin" : str(sync_timestamp) + "Authorization": "token " + + config.ERPNEXT_API_KEY + + ":" + + config.ERPNEXT_API_SECRET, + "Accept": "application/json", } + data = {"last_sync_of_checkin": str(sync_timestamp)} try: response = requests.request("PUT", url, headers=headers, data=json.dumps(data)) if response.status_code == 200: - info_logger.info("\t".join(['Shift Type last_sync_of_checkin Updated', str(shift_type_name), str(sync_timestamp.timestamp())])) + info_logger.info( + "\t".join( + [ + "Shift Type last_sync_of_checkin Updated", + str(shift_type_name), + str(sync_timestamp.timestamp()), + ] + ) + ) else: error_str = _safe_get_error_str(response) - error_logger.error('\t'.join(['Error during ERPNext Shift Type API Call.', str(shift_type_name), str(sync_timestamp.timestamp()), error_str])) + error_logger.error( + "\t".join( + [ + "Error during ERPNext Shift Type API Call.", + str(shift_type_name), + str(sync_timestamp.timestamp()), + error_str, + ] + ) + ) return response.status_code except: - error_logger.exception("\t".join(['exception when updating last_sync_of_checkin in Shift Type', str(shift_type_name), str(sync_timestamp.timestamp())])) + error_logger.exception( + "\t".join( + [ + "exception when updating last_sync_of_checkin in Shift Type", + str(shift_type_name), + str(sync_timestamp.timestamp()), + ] + ) + ) + def get_last_line_from_file(file): # concerns to address(may be much later): - # how will last line lookup work with log rotation when a new file is created? - #- will that new file be empty at any time? or will it have a partial line from the previous file? + # how will last line lookup work with log rotation when a new file is created? + # - will that new file be empty at any time? or will it have a partial line from the previous file? line = None if os.stat(file).st_size < 5000: # quick hack to handle files with one line - with open(file, 'r') as f: + with open(file, "r") as f: for line in f: pass else: # optimized for large log files - with open(file, 'rb') as f: + with open(file, "rb") as f: f.seek(-2, os.SEEK_END) - while f.read(1) != b'\n': + while f.read(1) != b"\n": f.seek(-2, os.SEEK_CUR) line = f.readline().decode() return line def setup_logger(name, log_file, level=logging.INFO, formatter=None): - if not formatter: - formatter = logging.Formatter('%(asctime)s\t%(levelname)s\t%(message)s') + formatter = logging.Formatter("%(asctime)s\t%(levelname)s\t%(message)s") handler = RotatingFileHandler(log_file, maxBytes=10000000, backupCount=50) handler.setFormatter(formatter) @@ -288,45 +541,92 @@ def setup_logger(name, log_file, level=logging.INFO, formatter=None): return logger + def get_dump_file_name_and_directory(device_id, device_ip): - return config.LOGS_DIRECTORY + '/' + device_id + "_" + device_ip.replace('.', '_') + '_last_fetch_dump.json' + return ( + config.LOGS_DIRECTORY + + "/" + + device_id + + "_" + + device_ip.replace(".", "_") + + "_last_fetch_dump.json" + ) + def _apply_function_to_key(obj, key, fn): obj[key] = fn(obj[key]) return obj + def _safe_convert_date(datestring, pattern): try: return datetime.datetime.strptime(datestring, pattern) except: return None + def _safe_get_error_str(res): try: error_json = json.loads(res._content) - if 'exc' in error_json: # this means traceback is available - error_str = json.loads(error_json['exc'])[0] + if "exc" in error_json: # this means traceback is available + error_str = json.loads(error_json["exc"])[0] else: error_str = json.dumps(error_json) except: error_str = str(res.__dict__) return error_str + # setup logger and status if not os.path.exists(config.LOGS_DIRECTORY): os.makedirs(config.LOGS_DIRECTORY) -error_logger = setup_logger('error_logger', '/'.join([config.LOGS_DIRECTORY, 'error.log']), logging.ERROR) -info_logger = setup_logger('info_logger', '/'.join([config.LOGS_DIRECTORY, 'logs.log'])) -status = pickledb.load('/'.join([config.LOGS_DIRECTORY, 'status.json']), True) +error_logger = setup_logger( + "error_logger", "/".join([config.LOGS_DIRECTORY, "error.log"]), logging.ERROR +) +info_logger = setup_logger("info_logger", "/".join([config.LOGS_DIRECTORY, "logs.log"])) +status = pickledb.load("/".join([config.LOGS_DIRECTORY, "status.json"]), True) -def infinite_loop(sleep_time=15): - print("Service Running...") - while True: + +def init_bulk_sync(sleep_time=15): + while not exit.is_set(): try: main() - time.sleep(sleep_time) + exit.wait(sleep_time) except BaseException as e: print(e) + +def init_live_sync(): + parallel_events = [] + for device in config.devices: + if not device.get("live_sync"): + continue + + process = Thread(target=live_sync_attendance, kwargs={"device": device}) + process.start() + parallel_events.append(process) + + for event in parallel_events: + event.join() + + +def infinite_loop(): + print("Service Running...") + thread1 = Thread(target=init_bulk_sync) + thread1.start() + thread2 = Thread(target=init_live_sync) + thread2.start() + + thread1.join() + thread2.join() + + +def quit(signo, _frame): + print("Interrupted by %d, shutting down" % signo) + exit.set() + + if __name__ == "__main__": + for sig in ("TERM", "HUP", "INT"): + signal.signal(getattr(signal, "SIG" + sig), quit) infinite_loop() diff --git a/gui.py b/gui.py deleted file mode 100755 index 36ac7e8..0000000 --- a/gui.py +++ /dev/null @@ -1,391 +0,0 @@ -import datetime -import json -import os -import shlex -import sys -import subprocess -import local_config as config - -from PyQt5 import QtCore -from PyQt5 import QtWidgets -from PyQt5.QtCore import QRegExp -from PyQt5.QtGui import QIntValidator, QRegExpValidator -from PyQt5.QtWidgets import QApplication, QLabel, QLineEdit, QMainWindow, QMessageBox, QPushButton - - -config_template = '''# ERPNext related configs -ERPNEXT_API_KEY = '{0}' -ERPNEXT_API_SECRET = '{1}' -ERPNEXT_URL = '{2}' - - -# operational configs -PULL_FREQUENCY = {3} or 60 # in minutes -LOGS_DIRECTORY = 'logs' # logs of this script is stored in this directory -IMPORT_START_DATE = '{4}' or None # format: '20190501' - -# Biometric device configs (all keys mandatory) - #- device_id - must be unique, strictly alphanumerical chars only. no space allowed. - #- ip - device IP Address - #- punch_direction - 'IN'/'OUT'/'AUTO'/None - #- clear_from_device_on_fetch: if set to true then attendance is deleted after fetch is successful. - #(Caution: this feature can lead to data loss if used carelessly.) -devices = {5} - -# Configs updating sync timestamp in the Shift Type DocType -shift_type_device_mapping = {6} -''' - - -class BiometricWindow(QMainWindow): - def __init__(self): - super().__init__() - self.reg_exp_for_ip = r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?=\s*netmask)" - self.init_ui() - - def closeEvent(self, event): - can_exit = not hasattr(self, "p") - if can_exit: - event.accept() - else: - create_message_box(text="Window cannot be closed when \nservice is running!", title="Message", width=200) - event.ignore() - - def init_ui(self): - self.counter = 0 - self.setup_window() - self.setup_textboxes_and_label() - self.center() - self.show() - - def setup_window(self): - self.setFixedSize(470, 550) - self.setWindowTitle('ERPNext Biometric Service') - - def setup_textboxes_and_label(self): - - self.create_label("API Secret", "api_secret", 20, 0, 200, 30) - self.create_field("textbox_erpnext_api_secret", 20, 30, 200, 30) - - self.create_label("API Key", "api_key", 20, 60, 200, 30) - self.create_field("textbox_erpnext_api_key", 20, 90, 200, 30) - - self.create_label("ERPNext URL", "erpnext_url", 20, 120, 200, 30) - self.create_field("textbox_erpnext_url", 20, 150, 200, 30) - - self.create_label("Pull Frequency (in minutes)", - "pull_frequency", 250, 0, 200, 30) - self.create_field("textbox_pull_frequency", 250, 30, 200, 30) - - self.create_label("Import Start Date", - "import_start_date", 250, 60, 200, 30) - self.create_field("textbox_import_start_date", 250, 90, 200, 30) - self.validate_data(r"^\d{1,2}/\d{1,2}/\d{4}$", "textbox_import_start_date") - - self.create_separator(210, 470) - self.create_button('+', 'add', 390, 230, 35, 30, self.add_devices_fields) - self.create_button('-', 'remove', 420, 230, 35, 30, self.remove_devices_fields) - - self.create_label("Device ID", "device_id", 20, 260, 0, 30) - self.create_label("Device IP", "device_ip", 170, 260, 0, 30) - self.create_label("Shift", "shift", 320, 260, 0, 0) - - # First Row for table - self.create_field("device_id_0", 20, 290, 145, 30) - self.create_field("device_ip_0", 165, 290, 145, 30) - self.validate_data(self.reg_exp_for_ip, "device_ip_0") - self.create_field("shift_0", 310, 290, 145, 30) - - # Actions buttons - self.create_button('Set Configuration', 'set_conf', 20, 500, 130, 30, self.setup_local_config) - self.create_button('Start Service', 'start_or_stop_service', 320, 500, 130, 30, self.integrate_biometric, enable=False) - self.create_button('Running Status', 'running_status', 170, 500, 130, 30, self.get_running_status, enable=False) - self.set_default_value_or_placeholder_of_field() - - # validating integer - self.onlyInt = QIntValidator(10, 30) - self.textbox_pull_frequency.setValidator(self.onlyInt) - - def set_default_value_or_placeholder_of_field(self): - if os.path.exists("local_config.py"): - import local_config as config - self.textbox_erpnext_api_secret.setText(config.ERPNEXT_API_SECRET) - self.textbox_erpnext_api_key.setText(config.ERPNEXT_API_KEY) - self.textbox_erpnext_url.setText(config.ERPNEXT_URL) - self.textbox_pull_frequency.setText(str(config.PULL_FREQUENCY)) - - if len(config.devices): - self.device_id_0.setText(config.devices[0]['device_id']) - self.device_ip_0.setText(config.devices[0]['ip']) - self.shift_0.setText( - config.shift_type_device_mapping[0]['shift_type_name']) - - if len(config.devices) > 1: - for _ in range(self.counter, len(config.devices) - 1): - self.add_devices_fields() - - device = getattr(self, 'device_id_' + str(self.counter)) - ip = getattr(self, 'device_ip_' + str(self.counter)) - shift = getattr(self, 'shift_' + str(self.counter)) - - device.setText(config.devices[self.counter]['device_id']) - ip.setText(config.devices[self.counter]['ip']) - shift.setText(config.shift_type_device_mapping[self.counter]['shift_type_name']) - else: - self.textbox_erpnext_api_secret.setPlaceholderText("c70ee57c7b3124c") - self.textbox_erpnext_api_key.setPlaceholderText("fb37y8fd4uh8ac") - self.textbox_erpnext_url.setPlaceholderText("example.erpnext.com") - self.textbox_pull_frequency.setPlaceholderText("60") - - self.textbox_import_start_date.setPlaceholderText("DD/MM/YYYY") - - # Widgets Genrators - def create_label(self, label_text, label_name, x, y, height, width): - setattr(self, label_name, QLabel(self)) - label = getattr(self, label_name) - label.move(x, y) - label.setText(label_text) - if height and width: - label.resize(height, width) - label.show() - - def create_field(self, field_name, x, y, height, width): - setattr(self, field_name, QLineEdit(self)) - field = getattr(self, field_name) - field.move(x, y) - field.resize(height, width) - field.show() - - def create_separator(self, y, width): - setattr(self, 'separator', QLineEdit(self)) - field = getattr(self, 'separator') - field.move(0, y) - field.resize(width, 5) - field.setEnabled(False) - field.show() - - def create_button(self, button_label, button_name, x, y, height, width, callback_function, enable=True): - setattr(self, button_name, QPushButton(button_label, self)) - button = getattr(self, button_name) - button.move(x, y) - button.resize(height, width) - button.clicked.connect(callback_function) - button.setEnabled(enable) - - def center(self): - frame = self.frameGeometry() - screen = QApplication.desktop().screenNumber(QApplication.desktop().cursor().pos()) - centerPoint = QApplication.desktop().screenGeometry(screen).center() - frame.moveCenter(centerPoint) - self.move(frame.topLeft()) - - def add_devices_fields(self): - if self.counter < 5: - self.counter += 1 - self.create_field("device_id_" + str(self.counter), 20, 290+(self.counter * 30), 145, 30) - self.create_field("device_ip_" + str(self.counter), 165, 290+(self.counter * 30), 145, 30) - self.validate_data(self.reg_exp_for_ip, "device_ip_" + str(self.counter)) - self.create_field("shift_" + str(self.counter), 310, 290+(self.counter * 30), 145, 30) - - def validate_data(self, reg_exp, field_name): - field = getattr(self, field_name) - reg_ex = QRegExp(reg_exp) - input_validator = QRegExpValidator(reg_ex, field) - field.setValidator(input_validator) - - def remove_devices_fields(self): - if self.counter > 0: - b = getattr(self, "shift_" + str(self.counter)) - b.deleteLater() - b = getattr(self, "device_id_" + str(self.counter)) - b.deleteLater() - b = getattr(self, "device_ip_" + str(self.counter)) - b.deleteLater() - - self.counter -= 1 - - def integrate_biometric(self): - button = getattr(self, "start_or_stop_service") - - if not hasattr(self, 'p'): - print("Starting Service...") - command = shlex.split('python -c "from erpnext_sync import infinite_loop; infinite_loop()"') - self.p = subprocess.Popen(command, stdout=subprocess.PIPE) - print("Process running at {}".format(self.p.pid)) - button.setText("Stop Service") - create_message_box("Service status", "Service has been started") - self.create_label(str(datetime.datetime.now()), "service_start_time", 20, 60, 200, 30) - self.service_start_time.setHidden(True) - getattr(self, 'running_status').setEnabled(True) - else: - print("Stopping Service...") - self.p.kill() - del self.p - button.setText("Start Service") - create_message_box("Service status", "Service has been stoped") - getattr(self, 'running_status').setEnabled(False) - - def setup_local_config(self): - bio_config = self.get_local_config() - - print("Setting Local Configuration...") - - if not bio_config: - print("Local Configuration not updated...") - return 0 - - if os.path.exists("local_config.py"): - os.remove("local_config.py") - - with open("local_config.py", 'w+') as f: - f.write(bio_config) - - print("Local Configuration Updated.") - - create_message_box("Message", "Configuration Updated!\nClick on Start Service.") - - getattr(self, 'start_or_stop_service').setEnabled(True) - - def get_device_details(self): - device = {} - devices = [] - shifts = [] - - for idx in range(0, self.counter+1): - shift = getattr(self, "shift_" + str(idx)).text() - device_id = getattr(self, "device_id_" + str(idx)).text() - devices.append({ - 'device_id': device_id, - 'ip': getattr(self, "device_ip_" + str(idx)).text(), - 'punch_direction': '', - 'clear_from_device_on_fetch': '' - }) - if shift in device: - device[shift].append(device_id) - else: - device[shift]=[device_id] - - for shift_type_name in device.keys(): - shifts.append({ - 'shift_type_name': shift_type_name, - 'related_device_id': device[shift_type_name] - }) - return devices, shifts - - def get_local_config(self): - if not validate_fields(self): - return 0 - string = self.textbox_import_start_date.text() - formated_date = "".join([ele for ele in reversed(string.split("/"))]) - - devices, shifts = self.get_device_details() - return config_template.format(self.textbox_erpnext_api_key.text(), self.textbox_erpnext_api_secret.text(), self.textbox_erpnext_url.text(), self.textbox_pull_frequency.text(), formated_date, json.dumps(devices), json.dumps(shifts)) - - def get_running_status(self): - running_status = [] - with open('/'.join([config.LOGS_DIRECTORY])+'/logs.log', 'r') as f: - index = 0 - for idx, line in enumerate(f,1): - logdate = convert_into_date(line.split(',')[0], '%Y-%m-%d %H:%M:%S') - if logdate and logdate >= convert_into_date(self.service_start_time.text().split('.')[0] , '%Y-%m-%d %H:%M:%S'): - index = idx - break - if index: - running_status.extend(read_file_contents('logs',index)) - - with open('/'.join([config.LOGS_DIRECTORY])+'/error.log', 'r') as fread: - error_index = 0 - for error_idx, error_line in enumerate(fread,1): - start_date = convert_into_date(self.service_start_time.text().split('.')[0] , '%Y-%m-%d %H:%M:%S') - if start_date and start_date.strftime('%Y-%m-%d') in error_line: - error_logdate = convert_into_date(error_line.split(',')[0], '%Y-%m-%d %H:%M:%S') - if error_logdate and error_logdate >= start_date: - error_index = error_idx - break - if error_index: - running_status.extend(read_file_contents('error',error_index)) - - if running_status: - create_message_box("Running status", ''.join(running_status)) - else: - create_message_box("Running status", 'Process not yet started') - -def read_file_contents(file_name, index): - running_status = [] - with open('/'.join([config.LOGS_DIRECTORY])+f'/{file_name}.log', 'r') as file_handler: - for idx, line in enumerate(file_handler,1): - if idx>=index: - running_status.append(line) - return running_status - - -def validate_fields(self): - def message(text): - create_message_box("Missing Value", "Please Set {}".format(text), "warning") - - if not self.textbox_erpnext_api_key.text(): - return message("API Key") - - if not self.textbox_erpnext_api_secret.text(): - return message("API Secret") - - if not self.textbox_erpnext_url.text(): - return message("ERPNext URL") - - if not self.textbox_import_start_date.text(): - return message("Import Start Date") - - return validate_date(self.textbox_import_start_date.text()) - - -def validate_date(date): - try: - datetime.datetime.strptime(date, '%d/%m/%Y') - return True - except ValueError: - create_message_box("", "Please Enter Date in correct format", "warning", width=200) - return False - - -def convert_into_date(datestring, pattern): - try: - return datetime.datetime.strptime(datestring, pattern) - except: - return None - - -def create_message_box(title, text, icon="information", width=150): - msg = QMessageBox() - msg.setWindowTitle(title) - lineCnt = len(text.split('\n')) - if lineCnt > 15: - scroll = QtWidgets.QScrollArea() - scroll.setWidgetResizable(1) - content = QtWidgets.QWidget() - scroll.setWidget(content) - layout = QtWidgets.QVBoxLayout(content) - tmpLabel = QtWidgets.QLabel(text) - tmpLabel.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse) - layout.addWidget(tmpLabel) - msg.layout().addWidget(scroll, 12, 10, 1, msg.layout().columnCount()) - msg.setStyleSheet("QScrollArea{min-width:550 px; min-height: 400px}") - else: - msg.setText(text) - if icon == "warning": - msg.setIcon(QtWidgets.QMessageBox.Warning) - msg.setStyleSheet("QMessageBox Warning{min-width: 50 px;}") - else: - msg.setIcon(QtWidgets.QMessageBox.Information) - msg.setStyleSheet("QMessageBox Information{min-width: 50 px;}") - msg.setStyleSheet("QmessageBox QLabel{min-width: "+str(width)+"px;}") - msg.exec_() - - -def setup_window(): - biometric_app = QApplication(sys.argv) - biometric_window = BiometricWindow() - biometric_app.exec_() - - -if __name__ == "__main__": - setup_window() diff --git a/install.py b/install.py deleted file mode 100755 index 9a90ffe..0000000 --- a/install.py +++ /dev/null @@ -1,7 +0,0 @@ -import os - -print("Checking Dependencies...") -os.system("python -m pip install -q -r requirements.txt") - -from gui import setup_window -setup_window() diff --git a/local_config.py.template b/local_config.py.template index ce8d5fa..c4f5da4 100644 --- a/local_config.py.template +++ b/local_config.py.template @@ -1,31 +1,43 @@ - # ERPNext related configs -ERPNEXT_API_KEY = '' -ERPNEXT_API_SECRET = '' -ERPNEXT_URL = 'http://dev.local:8000' +ERPNEXT_API_KEY = "" +ERPNEXT_API_SECRET = "" +ERPNEXT_URL = "http://dev.local:8000" ERPNEXT_VERSION = 13 # operational configs -PULL_FREQUENCY = 60 # in minutes -LOGS_DIRECTORY = 'logs' # logs of this script is stored in this directory -IMPORT_START_DATE = None # format: '20190501' +PULL_FREQUENCY = 60 # in minutes +LOGS_DIRECTORY = "logs" # logs of this script is stored in this directory +IMPORT_START_DATE = None # format: '20190501' # Biometric device configs (all keys mandatory) - #- device_id - must be unique, strictly alphanumerical chars only. no space allowed. - #- ip - device IP Address - #- punch_direction - 'IN'/'OUT'/'AUTO'/None - #- clear_from_device_on_fetch: if set to true then attendance is deleted after fetch is successful. - #(Caution: this feature can lead to data loss if used carelessly.) +# - device_id - must be unique, strictly alphanumerical chars only. no space allowed. +# - ip - device IP Address +# - punch_direction - 'IN'/'OUT'/'AUTO'/None +# - clear_from_device_on_fetch: if set to true then attendance is deleted after fetch is successful. +# (Caution: this feature can lead to data loss if used carelessly.) +# - live_sync - whether or not checkin should be fetch devices = [ - {'device_id':'test_1','ip':'192.168.0.209', 'punch_direction': None, 'clear_from_device_on_fetch': False}, - {'device_id':'test_2','ip':'192.168.2.209', 'punch_direction': None, 'clear_from_device_on_fetch': False} + { + "device_id": "test_1", + "ip": "192.168.0.209", + "punch_direction": None, + "clear_from_device_on_fetch": False, + "live_sync": True, + }, + { + "device_id": "test_2", + "ip": "192.168.2.209", + "punch_direction": None, + "clear_from_device_on_fetch": False, + "live_sync": False, + }, ] -# Configs updating sync timestamp in the Shift Type DocType +# Configs updating sync timestamp in the Shift Type DocType # please, read this thread to know why this is necessary https://discuss.erpnext.com/t/v-12-hr-auto-attendance-purpose-of-last-sync-of-checkin-in-shift-type/52997 shift_type_device_mapping = [ - {'shift_type_name': ['Shift1'], 'related_device_id': ['test_1','test_2']} + {"shift_type_name": ["Shift1"], "related_device_id": ["test_1", "test_2"]} ] @@ -35,4 +47,4 @@ shift_type_device_mapping = [ # 2. Employee is inactive for the given employee User ID in the Biometric device. # 3. Duplicate Employee Checkin found. (This exception can happen if you have cleared the logs/status.json of this script) # Use the corresponding number to ignore the above exceptions. (Default: Ignores all the listed exceptions) -allowed_exceptions = [1,2,3] \ No newline at end of file +allowed_exceptions = [1, 2, 3] diff --git a/requirements.txt b/requirements.txt index dc240a4..07b3c6e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ requests pickledb pyzk -PyQt5