Skip to content

Commit

Permalink
[MINOR]: Transaction Retry Refactor (#1494)
Browse files Browse the repository at this point in the history
* [MINOR]: Transaction Retry Refactor

* [MINOR]: Duplicate Error Handler
  • Loading branch information
amadolid authored Jan 3, 2025
1 parent 1942ca1 commit ac26183
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 47 deletions.
39 changes: 21 additions & 18 deletions jac-cloud/jac_cloud/core/architype.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,33 +234,33 @@ def has_operations(self) -> bool:
def commit(session: ClientSession) -> None:
"""Commit current session."""
commit_retry = 0
commit_max_retry = BulkWrite.SESSION_MAX_COMMIT_RETRY
while commit_retry <= commit_max_retry:
while True:
try:
session.commit_transaction()
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("UnknownTransactionCommitResult"):
if (
ex.has_error_label("UnknownTransactionCommitResult")
and commit_retry <= BulkWrite.SESSION_MAX_COMMIT_RETRY
):
commit_retry += 1
logger.error(
"Error commiting bulk write! "
f"Retrying [{commit_retry}/{commit_max_retry}] ..."
logger.exception(
"Error commiting session! "
f"Retrying [{commit_retry}/{BulkWrite.SESSION_MAX_COMMIT_RETRY}] ..."
)
continue
logger.error(
f"Error commiting bulk write after max retry [{commit_max_retry}] !"
logger.exception(
f"Error commiting session after max retry [{BulkWrite.SESSION_MAX_COMMIT_RETRY}] !"
)
raise
except Exception:
session.abort_transaction()
logger.error("Error commiting bulk write!")
logger.exception("Error commiting session!")
raise

def execute(self, session: ClientSession) -> None:
"""Execute all operations."""
transaction_retry = 0
transaction_max_retry = self.SESSION_MAX_TRANSACTION_RETRY
while transaction_retry <= transaction_max_retry:
while True:
try:
if node_operation := self.operations[NodeAnchor]:
NodeAnchor.Collection.bulk_write(node_operation, False, session)
Expand All @@ -273,19 +273,22 @@ def execute(self, session: ClientSession) -> None:
self.commit(session)
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and transaction_retry <= self.SESSION_MAX_TRANSACTION_RETRY
):
transaction_retry += 1
logger.error(
logger.exception(
"Error executing bulk write! "
f"Retrying [{transaction_retry}/{transaction_max_retry}] ..."
f"Retrying [{transaction_retry}/{self.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.error(
f"Error executing bulk write after max retry [{transaction_max_retry}] !"
logger.exception(
f"Error executing bulk write after max retry [{self.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.error("Error executing bulk write!")
logger.exception("Error executing bulk write!")
raise


Expand Down
26 changes: 15 additions & 11 deletions jac-cloud/jac_cloud/jaseci/routers/sso.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from fastapi_sso.sso.twitter import TwitterSSO
from fastapi_sso.sso.yandex import YandexSSO

from pymongo.errors import ConnectionFailure, OperationFailure
from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure

from ..dtos import AttachSSO, DetachSSO
from ..models import NO_PASSWORD, User as BaseUser
Expand Down Expand Up @@ -214,8 +214,7 @@ def register(platform: str, open_id: OpenID) -> Response:

with User.Collection.get_session() as session, session.start_transaction():
retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
while True:
try:
if not User.Collection.update_one(
{"email": open_id.email},
Expand Down Expand Up @@ -243,21 +242,26 @@ def register(platform: str, open_id: OpenID) -> Response:
User.Collection.insert_one(ureq, session=session)
BulkWrite.commit(session)
return login(platform, open_id)
except DuplicateKeyError:
raise HTTPException(409, "Already Exists!")
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY
):
retry += 1
logger.error(
logger.exception(
"Error executing bulk write! "
f"Retrying [{retry}/{max_retry}] ..."
f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
logger.exception(
f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
raise
return ORJSONResponse({"message": "Registration Failed!"}, 409)


Expand Down
25 changes: 15 additions & 10 deletions jac-cloud/jac_cloud/jaseci/routers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from passlib.hash import pbkdf2_sha512

from pymongo.errors import ConnectionFailure, OperationFailure
from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure

from ..dtos import (
UserChangePassword,
Expand Down Expand Up @@ -49,8 +49,7 @@ def register(req: User.register_type()) -> ORJSONResponse: # type: ignore
is_activated = req_obf["is_activated"] = not Emailer.has_client()

retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
while True:
try:
NodeAnchor.Collection.insert_one(root.serialize(), session)
if id := (
Expand All @@ -62,21 +61,27 @@ def register(req: User.register_type()) -> ORJSONResponse: # type: ignore
resp = {"message": "Successfully Registered!"}
log_exit(resp, log)
return ORJSONResponse(resp, 201)
raise SystemError("Can't create System Admin!")
except DuplicateKeyError:
raise HTTPException(409, "Already Exists!")
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY
):
retry += 1
logger.error(
"Error executing bulk write! "
f"Retrying [{retry}/{max_retry}] ..."
f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
logger.exception(
f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
raise

resp = {"message": "Registration Failed!"}
log_exit(resp, log)
Expand Down
18 changes: 10 additions & 8 deletions jac-cloud/jac_cloud/plugin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ def create_system_admin(
)

retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
while True:
try:
if not NodeAnchor.Collection.find_by_id(
SUPER_ROOT_ID, session=session
Expand All @@ -160,21 +159,24 @@ def create_system_admin(
).inserted_id:
BulkWrite.commit(session)
return f"System Admin created with id: {id}"
session.abort_transaction()
raise SystemError("Can't create System Admin!")
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY
):
retry += 1
logger.error(
"Error executing bulk write! "
f"Retrying [{retry}/{max_retry}] ..."
f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.exception("Error executing bulk write!")
session.abort_transaction()
logger.exception(
f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.exception("Error executing bulk write!")
session.abort_transaction()
raise

raise Exception("Can't process registration. Please try again!")

0 comments on commit ac26183

Please sign in to comment.