diff --git a/jac-cloud/jac_cloud/core/architype.py b/jac-cloud/jac_cloud/core/architype.py index c893c7cbb..36066268b 100644 --- a/jac-cloud/jac_cloud/core/architype.py +++ b/jac-cloud/jac_cloud/core/architype.py @@ -234,33 +234,33 @@ def has_operations(self) -> bool: def commit(session: ClientSession) -> None: """Commit current session.""" commit_retry = 0 - commit_max_retry = BulkWrite.SESSION_MAX_COMMIT_RETRY - while commit_retry <= commit_max_retry: + while True: try: session.commit_transaction() break except (ConnectionFailure, OperationFailure) as ex: - if ex.has_error_label("UnknownTransactionCommitResult"): + if ( + ex.has_error_label("UnknownTransactionCommitResult") + and commit_retry <= BulkWrite.SESSION_MAX_COMMIT_RETRY + ): commit_retry += 1 - logger.error( - "Error commiting bulk write! " - f"Retrying [{commit_retry}/{commit_max_retry}] ..." + logger.exception( + "Error commiting session! " + f"Retrying [{commit_retry}/{BulkWrite.SESSION_MAX_COMMIT_RETRY}] ..." ) continue - logger.error( - f"Error commiting bulk write after max retry [{commit_max_retry}] !" + logger.exception( + f"Error commiting session after max retry [{BulkWrite.SESSION_MAX_COMMIT_RETRY}] !" ) raise except Exception: - session.abort_transaction() - logger.error("Error commiting bulk write!") + logger.exception("Error commiting session!") raise def execute(self, session: ClientSession) -> None: """Execute all operations.""" transaction_retry = 0 - transaction_max_retry = self.SESSION_MAX_TRANSACTION_RETRY - while transaction_retry <= transaction_max_retry: + while True: try: if node_operation := self.operations[NodeAnchor]: NodeAnchor.Collection.bulk_write(node_operation, False, session) @@ -273,19 +273,22 @@ def execute(self, session: ClientSession) -> None: self.commit(session) break except (ConnectionFailure, OperationFailure) as ex: - if ex.has_error_label("TransientTransactionError"): + if ( + ex.has_error_label("TransientTransactionError") + and transaction_retry <= self.SESSION_MAX_TRANSACTION_RETRY + ): transaction_retry += 1 - logger.error( + logger.exception( "Error executing bulk write! " - f"Retrying [{transaction_retry}/{transaction_max_retry}] ..." + f"Retrying [{transaction_retry}/{self.SESSION_MAX_TRANSACTION_RETRY}] ..." ) continue - logger.error( - f"Error executing bulk write after max retry [{transaction_max_retry}] !" + logger.exception( + f"Error executing bulk write after max retry [{self.SESSION_MAX_TRANSACTION_RETRY}] !" ) raise except Exception: - logger.error("Error executing bulk write!") + logger.exception("Error executing bulk write!") raise diff --git a/jac-cloud/jac_cloud/jaseci/routers/sso.py b/jac-cloud/jac_cloud/jaseci/routers/sso.py index 1da68aaee..045eab2c9 100644 --- a/jac-cloud/jac_cloud/jaseci/routers/sso.py +++ b/jac-cloud/jac_cloud/jaseci/routers/sso.py @@ -25,7 +25,7 @@ from fastapi_sso.sso.twitter import TwitterSSO from fastapi_sso.sso.yandex import YandexSSO -from pymongo.errors import ConnectionFailure, OperationFailure +from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure from ..dtos import AttachSSO, DetachSSO from ..models import NO_PASSWORD, User as BaseUser @@ -214,8 +214,7 @@ def register(platform: str, open_id: OpenID) -> Response: with User.Collection.get_session() as session, session.start_transaction(): retry = 0 - max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY - while retry <= max_retry: + while True: try: if not User.Collection.update_one( {"email": open_id.email}, @@ -243,21 +242,26 @@ def register(platform: str, open_id: OpenID) -> Response: User.Collection.insert_one(ureq, session=session) BulkWrite.commit(session) return login(platform, open_id) + except DuplicateKeyError: + raise HTTPException(409, "Already Exists!") except (ConnectionFailure, OperationFailure) as ex: - if ex.has_error_label("TransientTransactionError"): + if ( + ex.has_error_label("TransientTransactionError") + and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY + ): retry += 1 - logger.error( + logger.exception( "Error executing bulk write! " - f"Retrying [{retry}/{max_retry}] ..." + f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..." ) continue - logger.exception("Error executing bulk write!") - session.abort_transaction() - break + logger.exception( + f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !" + ) + raise except Exception: logger.exception("Error executing bulk write!") - session.abort_transaction() - break + raise return ORJSONResponse({"message": "Registration Failed!"}, 409) diff --git a/jac-cloud/jac_cloud/jaseci/routers/user.py b/jac-cloud/jac_cloud/jaseci/routers/user.py index 6893747ad..6f1292187 100644 --- a/jac-cloud/jac_cloud/jaseci/routers/user.py +++ b/jac-cloud/jac_cloud/jaseci/routers/user.py @@ -8,7 +8,7 @@ from passlib.hash import pbkdf2_sha512 -from pymongo.errors import ConnectionFailure, OperationFailure +from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure from ..dtos import ( UserChangePassword, @@ -49,8 +49,7 @@ def register(req: User.register_type()) -> ORJSONResponse: # type: ignore is_activated = req_obf["is_activated"] = not Emailer.has_client() retry = 0 - max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY - while retry <= max_retry: + while True: try: NodeAnchor.Collection.insert_one(root.serialize(), session) if id := ( @@ -62,21 +61,27 @@ def register(req: User.register_type()) -> ORJSONResponse: # type: ignore resp = {"message": "Successfully Registered!"} log_exit(resp, log) return ORJSONResponse(resp, 201) + raise SystemError("Can't create System Admin!") + except DuplicateKeyError: + raise HTTPException(409, "Already Exists!") except (ConnectionFailure, OperationFailure) as ex: - if ex.has_error_label("TransientTransactionError"): + if ( + ex.has_error_label("TransientTransactionError") + and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY + ): retry += 1 logger.error( "Error executing bulk write! " - f"Retrying [{retry}/{max_retry}] ..." + f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..." ) continue - logger.exception("Error executing bulk write!") - session.abort_transaction() - break + logger.exception( + f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !" + ) + raise except Exception: logger.exception("Error executing bulk write!") - session.abort_transaction() - break + raise resp = {"message": "Registration Failed!"} log_exit(resp, log) diff --git a/jac-cloud/jac_cloud/plugin/cli.py b/jac-cloud/jac_cloud/plugin/cli.py index b78efccc7..c562a6f38 100644 --- a/jac-cloud/jac_cloud/plugin/cli.py +++ b/jac-cloud/jac_cloud/plugin/cli.py @@ -135,8 +135,7 @@ def create_system_admin( ) retry = 0 - max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY - while retry <= max_retry: + while True: try: if not NodeAnchor.Collection.find_by_id( SUPER_ROOT_ID, session=session @@ -160,21 +159,24 @@ def create_system_admin( ).inserted_id: BulkWrite.commit(session) return f"System Admin created with id: {id}" - session.abort_transaction() + raise SystemError("Can't create System Admin!") except (ConnectionFailure, OperationFailure) as ex: - if ex.has_error_label("TransientTransactionError"): + if ( + ex.has_error_label("TransientTransactionError") + and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY + ): retry += 1 logger.error( "Error executing bulk write! " - f"Retrying [{retry}/{max_retry}] ..." + f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..." ) continue - logger.exception("Error executing bulk write!") - session.abort_transaction() + logger.exception( + f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !" + ) raise except Exception: logger.exception("Error executing bulk write!") - session.abort_transaction() raise raise Exception("Can't process registration. Please try again!")