From fdc94666c578b530323d6e8635ac8db220176dfb Mon Sep 17 00:00:00 2001 From: elonen Date: Tue, 14 May 2024 21:42:58 +0300 Subject: [PATCH] Improve DB and error handling for organizer --- README.md | 4 + doc/sysadmin-guide.md | 6 +- organizer/basic_folders/organizer/__init__.py | 68 +++++++--- .../organizer/folder_op_methods.py | 118 +++++++++--------- .../organizer/migration_methods.py | 6 +- .../organizer/testing_methods.py | 10 +- .../organizer/user_session_methods.py | 26 ++-- protobuf/proto/organizer.proto | 14 ++- server/Cargo.lock | 1 + server/Cargo.toml | 1 + server/src/api_server/user_session.rs | 3 + server/src/api_server/ws_handers.rs | 40 ++++-- 12 files changed, 188 insertions(+), 109 deletions(-) diff --git a/README.md b/README.md index c9a9d3e..c7252d4 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,10 @@ Clapshot now includes an extensible [Organizer Plugin system](doc/organizer-plug Organizers use gRPC to communicate with the Clapshot Server, and can be implemented in any language. +The provided default/example organizer, called "basic_folders" (in *Python*), implements + - personal folder UI for users, and + - for admin, list of users and a way to manage their folder contents. + ### Work In Progress The [Organizer API](protobuf/proto/organizer.proto) is still evolving, so you are invited to **provide feedback** and discuss the future development, but please **do not expect backwards compatibility** for now. diff --git a/doc/sysadmin-guide.md b/doc/sysadmin-guide.md index 85704fb..116a242 100644 --- a/doc/sysadmin-guide.md +++ b/doc/sysadmin-guide.md @@ -43,7 +43,11 @@ Running the server without migrations enabled will detect that the database is o Clapshot server itself contains no authentication code. Instead, it trusts HTTP server (reverse proxy) to take care of that and to pass authenticated user ID and username in request headers. This is exactly what the basic auth / htadmin demo -above does, too. +above does, too: + + - `X-Remote-User-Id` / `X_Remote_User_Id` / `HTTP_X_REMOTE_USER_ID` – Authenticated user's ID (e.g. "alice.brown") + - `X-Remote-User-Name` / `X_Remote_User_Name` / `HTTP_X_REMOTE_USER_NAME` – Display name for user (e.g. "Alice Brown") + - `X-Remote-User-Is-Admin` / `X_Remote_User_Is_Admin` / `HTTP_X_REMOTE_USER_IS_ADMIN` – If set to "1" or "true", user is a Clapshot admin Most modern real-world deployments will likely use some more advanced authentication mechanism, such as OAuth, Kerberos etc, but htadmin is a good starting point. diff --git a/organizer/basic_folders/organizer/__init__.py b/organizer/basic_folders/organizer/__init__.py index 941b012..bbe7c19 100644 --- a/organizer/basic_folders/organizer/__init__.py +++ b/organizer/basic_folders/organizer/__init__.py @@ -7,13 +7,16 @@ import clapshot_grpc.clapshot as clap import clapshot_grpc.clapshot.organizer as org import sqlalchemy +import sqlalchemy.exc + +from functools import wraps from organizer.database.connection import open_database -from .migration_methods import check_migrations, apply_migration, after_migrations -from .user_session_methods import connect_back_to_server, on_start_user_session, navigate_page, cmd_from_client -from .folder_op_methods import move_to_folder, reorder_items -from .testing_methods import list_tests, run_test +from .migration_methods import check_migrations_impl, apply_migration_impl, after_migrations_impl +from .user_session_methods import connect_back_to_server, on_start_user_session_impl, navigate_page_impl, cmd_from_client_impl +from .folder_op_methods import move_to_folder_impl, reorder_items_impl +from .testing_methods import list_tests_impl, run_test_impl from .helpers.folders import FoldersHelper from .helpers.pages import PagesHelper @@ -27,6 +30,32 @@ def override(func): # type: ignore return func + + +def organizer_grpc_handler(func): + @wraps(func) + async def wrapper(self, request): + try: + try: + return await func(self, request) + except sqlalchemy.exc.OperationalError as e: + raise GRPCError(GrpcStatus.RESOURCE_EXHAUSTED, f"DB error: {e}") + except GRPCError as e: + # Intercept some known session errors and show them to the user nicely + if e.status in (GrpcStatus.INVALID_ARGUMENT, GrpcStatus.PERMISSION_DENIED, GrpcStatus.ALREADY_EXISTS, GrpcStatus.RESOURCE_EXHAUSTED): + await self.srv.client_show_user_message(org.ClientShowUserMessageRequest(sid=request.ses.sid, + msg = clap.UserMessage( + message=str(e.message), + user_id=request.ses.user.id, + type=clap.UserMessageType.ERROR, + details=str(e.details) if e.details else None))) + raise GRPCError(GrpcStatus.ABORTED) # Tell Clapshot server to ignore the result (we've shown the error to the user) + else: + raise e + return wrapper + + + class OrganizerInbound(org.OrganizerInboundBase): srv: org.OrganizerOutboundStub # connection back to Clapshot server log: Logger @@ -60,33 +89,40 @@ async def handshake(self, server_info: org.ServerInfo) -> clap.Empty: # Migration methods @override + @organizer_grpc_handler async def check_migrations(self, request: org.CheckMigrationsRequest) -> org.CheckMigrationsResponse: - return await check_migrations(self, request) + return await check_migrations_impl(self, request) @override + @organizer_grpc_handler async def apply_migration(self, request: org.ApplyMigrationRequest) -> org.ApplyMigrationResponse: - return await apply_migration(self, request) + return await apply_migration_impl(self, request) @override + @organizer_grpc_handler async def after_migrations(self, request: org.AfterMigrationsRequest) -> clap.Empty: - return await after_migrations(self, request) + return await after_migrations_impl(self, request) # User session methods @override + @organizer_grpc_handler async def on_start_user_session(self, request: org.OnStartUserSessionRequest) -> org.OnStartUserSessionResponse: - return await on_start_user_session(self, request) + return await on_start_user_session_impl(self, request) @override + @organizer_grpc_handler async def navigate_page(self, request: org.NavigatePageRequest) -> org.ClientShowPageRequest: - return await navigate_page(self, request) + return await navigate_page_impl(self, request) @override + @organizer_grpc_handler async def cmd_from_client(self, request: org.CmdFromClientRequest) -> clap.Empty: - return await cmd_from_client(self, request) + return await cmd_from_client_impl(self, request) @override + @organizer_grpc_handler async def authz_user_action(self, request: org.AuthzUserActionRequest) -> org.AuthzResponse: raise GRPCError(GrpcStatus.UNIMPLEMENTED) # = let Clapshot server decide @@ -94,20 +130,24 @@ async def authz_user_action(self, request: org.AuthzUserActionRequest) -> org.Au # Folder operation methods @override + @organizer_grpc_handler async def move_to_folder(self, request: org.MoveToFolderRequest) -> clap.Empty: - return await move_to_folder(self, request) + return await move_to_folder_impl(self, request) @override + @organizer_grpc_handler async def reorder_items(self, request: org.ReorderItemsRequest) -> clap.Empty: - return await reorder_items(self, request) + return await reorder_items_impl(self, request) # Testing methods @override + @organizer_grpc_handler async def list_tests(self, request: clap.Empty) -> org.ListTestsResponse: - return await list_tests(self) + return await list_tests_impl(self) @override + @organizer_grpc_handler async def run_test(self, request: org.RunTestRequest) -> org.RunTestResponse: - return await run_test(self, request) + return await run_test_impl(self, request) diff --git a/organizer/basic_folders/organizer/folder_op_methods.py b/organizer/basic_folders/organizer/folder_op_methods.py index 0199c16..d02af47 100644 --- a/organizer/basic_folders/organizer/folder_op_methods.py +++ b/organizer/basic_folders/organizer/folder_op_methods.py @@ -13,7 +13,7 @@ import organizer -async def move_to_folder(oi: organizer.OrganizerInbound, req: org.MoveToFolderRequest) -> clap.Empty: +async def move_to_folder_impl(oi: organizer.OrganizerInbound, req: org.MoveToFolderRequest) -> clap.Empty: """ Organizer method (gRPC/protobuf) @@ -25,45 +25,47 @@ async def move_to_folder(oi: organizer.OrganizerInbound, req: org.MoveToFolderRe return clap.Empty() with oi.db_new_session() as dbs: - with dbs.begin_nested(): - dst_folder = dbs.query(DbFolder).filter(DbFolder.id == int(req.dst_folder_id)).one_or_none() - max_sort_order = dbs.query(sqlalchemy.func.max(DbFolderItems.sort_order)).filter(DbFolderItems.folder_id == int(req.dst_folder_id)).scalar() or 0 - - if not dst_folder: - raise GRPCError(GrpcStatus.NOT_FOUND, "Destination folder not found") - if dst_folder.user_id != req.ses.user.id and not req.ses.is_admin: - raise GRPCError(GrpcStatus.PERMISSION_DENIED, "Cannot move items to another user's folder") - - for it in req.ids: - # Move a folder - if it.folder_id: - fld_to_move: Optional[DbFolder] = dbs.query(DbFolder).filter(DbFolder.id == int(it.folder_id)).one_or_none() - - if not fld_to_move: - raise GRPCError(GrpcStatus.NOT_FOUND, f"Folder id '{it.folder_id}' not found") - if fld_to_move.id == dst_folder.id: - raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "Cannot move a folder into itself") - if fld_to_move.user_id != req.ses.user.id and not req.ses.is_admin: - raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's folder") + dst_folder = dbs.query(DbFolder).filter(DbFolder.id == int(req.dst_folder_id)).one_or_none() + max_sort_order = dbs.query(sqlalchemy.func.max(DbFolderItems.sort_order)).filter(DbFolderItems.folder_id == int(req.dst_folder_id)).scalar() or 0 + if not dst_folder: + raise GRPCError(GrpcStatus.NOT_FOUND, "Destination folder not found") + if dst_folder.user_id != req.ses.user.id and not req.ses.is_admin: + raise GRPCError(GrpcStatus.PERMISSION_DENIED, "Cannot move items to another user's folder") + + for it in req.ids: + with oi.db_new_session() as dbs: + # Move a folder + if it.folder_id: + fld_to_move: Optional[DbFolder] = dbs.query(DbFolder).filter(DbFolder.id == int(it.folder_id)).one_or_none() + + if not fld_to_move: + raise GRPCError(GrpcStatus.NOT_FOUND, f"Folder id '{it.folder_id}' not found") + if fld_to_move.id == dst_folder.id: + raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "Cannot move a folder into itself") + if fld_to_move.user_id != req.ses.user.id and not req.ses.is_admin: + raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's folder") + + with dbs.begin_nested(): cnt = dbs.query(DbFolderItems).filter(DbFolderItems.subfolder_id == fld_to_move.id).update({"folder_id": dst_folder.id, "sort_order": max_sort_order+1}) if cnt == 0: raise GRPCError(GrpcStatus.NOT_FOUND, f"Folder with ID '{fld_to_move.id}' is a root folder? Cannot move.") - assert dst_folder.user_id, "Destination folder has no user ID, cannot transfer ownership" + await _recursive_set_folder_owner(dbs, fld_to_move.id, dst_folder.user_id, set(), oi.log) - oi.log.debug(f"Moved folder '{fld_to_move.id}' to folder '{dst_folder.id}'") + oi.log.debug(f"Moved folder '{fld_to_move.id}' to folder '{dst_folder.id}'") - # Move a video - elif it.video_id: - vid_to_move = dbs.query(DbVideo).filter(DbVideo.id == it.video_id).one_or_none() + # Move a video + elif it.video_id: + vid_to_move = dbs.query(DbVideo).filter(DbVideo.id == it.video_id).one_or_none() - if not vid_to_move: - raise GRPCError(GrpcStatus.NOT_FOUND, f"Video '{it.video_id}' not found") - if vid_to_move.user_id != req.ses.user.id and not req.ses.is_admin: - raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's video") + if not vid_to_move: + raise GRPCError(GrpcStatus.NOT_FOUND, f"Video '{it.video_id}' not found") + if vid_to_move.user_id != req.ses.user.id and not req.ses.is_admin: + raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's video") + with dbs.begin_nested(): vid_to_move.user_id = dst_folder.user_id # transfer ownership cnt = dbs.query(DbFolderItems).filter(DbFolderItems.video_id == vid_to_move.id).update({"folder_id": dst_folder.id, "sort_order": max_sort_order+1}) if cnt == 0: # not in any folder yet => insert it @@ -77,7 +79,35 @@ async def move_to_folder(oi: organizer.OrganizerInbound, req: org.MoveToFolderRe return clap.Empty() -async def reorder_items(oi: organizer.OrganizerInbound, req: org.ReorderItemsRequest) -> clap.Empty: +async def _recursive_set_folder_owner(dbs: sqlalchemy.orm.Session, folder_id: int, new_owner_id: str, seen: set[int], log: Logger) -> None: + """ + Set the owner of a folder and all its subfolders + videos recursively. + """ + assert isinstance(folder_id, int), f"Unexpected subfolder ID type on: {folder_id} ({type(folder_id)})" + + if folder_id in seen: + log.warning(f"Folder loop detected! THIS SHOULD NOT HAPPEN. Skipping folder '{folder_id}'") + return + seen.add(folder_id) + + # Update folder itself + log.debug(f"Setting owner of folder '{folder_id}' to '{new_owner_id}'") + dbs.query(DbFolder).filter(DbFolder.id == folder_id).update({"user_id": new_owner_id}) + + # Update videos in this folder + log.debug(f"Setting owner of folder '{folder_id}' videos to '{new_owner_id}'") + videos_subq = dbs.query(DbFolderItems.video_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.video_id != None).subquery() + dbs.query(DbVideo).filter(DbVideo.id.in_(sqlalchemy.select(videos_subq))).update({"user_id": new_owner_id}) + + # Update subfolders + sub_ids = dbs.query(DbFolderItems.subfolder_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.subfolder_id != None).all() + for subi in sub_ids: + log.debug(f"Recursing to subfolder '{subi[0]}'") + await _recursive_set_folder_owner(dbs, subi[0], new_owner_id, seen, log) + + + +async def reorder_items_impl(oi: organizer.OrganizerInbound, req: org.ReorderItemsRequest) -> clap.Empty: """ Organizer (gRPC/protobuf) Called when user reorders items in a folder in the client UI. @@ -113,29 +143,3 @@ async def reorder_items(oi: organizer.OrganizerInbound, req: org.ReorderItemsReq else: raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "No folder ID in UI listing, cannot reorder") - -async def _recursive_set_folder_owner(dbs: sqlalchemy.orm.Session, folder_id: int, new_owner_id: str, seen: set[int], log: Logger) -> None: - """ - Set the owner of a folder and all its subfolders + videos recursively. - """ - assert isinstance(folder_id, int), f"Unexpected subfolder ID type on: {folder_id} ({type(folder_id)})" - - if folder_id in seen: - log.warning(f"Folder loop detected! THIS SHOULD NOT HAPPEN. Skipping folder '{folder_id}'") - return - seen.add(folder_id) - - # Update folder itself - log.debug(f"Setting owner of folder '{folder_id}' to '{new_owner_id}'") - dbs.query(DbFolder).filter(DbFolder.id == folder_id).update({"user_id": new_owner_id}) - - # Update videos in this folder - log.debug(f"Setting owner of folder '{folder_id}' videos to '{new_owner_id}'") - videos_subq = dbs.query(DbFolderItems.video_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.video_id != None).subquery() - dbs.query(DbVideo).filter(DbVideo.id.in_(sqlalchemy.select(videos_subq))).update({"user_id": new_owner_id}) - - # Update subfolders - sub_ids = dbs.query(DbFolderItems.subfolder_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.subfolder_id != None).all() - for subi in sub_ids: - log.debug(f"Recursing to subfolder '{subi[0]}'") - await _recursive_set_folder_owner(dbs, subi[0], new_owner_id, seen, log) diff --git a/organizer/basic_folders/organizer/migration_methods.py b/organizer/basic_folders/organizer/migration_methods.py index 168c8ac..00cff40 100644 --- a/organizer/basic_folders/organizer/migration_methods.py +++ b/organizer/basic_folders/organizer/migration_methods.py @@ -8,7 +8,7 @@ import organizer -async def check_migrations(oi, req: org.CheckMigrationsRequest) -> org.CheckMigrationsResponse: +async def check_migrations_impl(oi, req: org.CheckMigrationsRequest) -> org.CheckMigrationsResponse: """ Organizer method (gRPC/protobuf) @@ -22,7 +22,7 @@ async def check_migrations(oi, req: org.CheckMigrationsRequest) -> org.CheckMigr return org.CheckMigrationsResponse(current_schema_ver=cur_ver,pending_migrations=pending) -async def apply_migration(oi: organizer.OrganizerInbound, req: org.ApplyMigrationRequest) -> org.ApplyMigrationResponse: +async def apply_migration_impl(oi: organizer.OrganizerInbound, req: org.ApplyMigrationRequest) -> org.ApplyMigrationResponse: """ Organizer method (gRPC/protobuf) @@ -35,7 +35,7 @@ async def apply_migration(oi: organizer.OrganizerInbound, req: org.ApplyMigratio return org.ApplyMigrationResponse() -async def after_migrations(oi: organizer.OrganizerInbound, _: org.AfterMigrationsRequest) -> clap.Empty: +async def after_migrations_impl(oi: organizer.OrganizerInbound, _: org.AfterMigrationsRequest) -> clap.Empty: """ Organizer method (gRPC/protobuf) diff --git a/organizer/basic_folders/organizer/testing_methods.py b/organizer/basic_folders/organizer/testing_methods.py index d2838ef..3eb5062 100644 --- a/organizer/basic_folders/organizer/testing_methods.py +++ b/organizer/basic_folders/organizer/testing_methods.py @@ -21,7 +21,7 @@ from organizer.helpers.folders import FoldersHelper -async def list_tests(oi: organizer.OrganizerInbound) -> org.ListTestsResponse: +async def list_tests_impl(oi: organizer.OrganizerInbound) -> org.ListTestsResponse: """ Organizer method (gRPC/protobuf) @@ -37,7 +37,7 @@ async def list_tests(oi: organizer.OrganizerInbound) -> org.ListTestsResponse: return org.ListTestsResponse(test_names=test_names) -async def run_test(oi, request: org.RunTestRequest) -> org.RunTestResponse: +async def run_test_impl(oi, request: org.RunTestRequest) -> org.RunTestResponse: """ Organizer method (gRPC/protobuf) @@ -91,7 +91,7 @@ async def org_test__start_user_session(oi: organizer.OrganizerInbound): on_start_user_session() -- Just a simple test to check if the method doesn't crash. """ user = oi.db_new_session().query(DbUser).first() - res = await oi.on_start_user_session(org.OnStartUserSessionRequest( + res = await organizer.on_start_user_session_impl(oi, org.OnStartUserSessionRequest( org.UserSessionData( sid="test_sid",user=clap.UserInfo(id=user.id, name=user.name), is_admin=False, cookies={}) @@ -104,7 +104,7 @@ async def org_test__navigate_page(oi: organizer.OrganizerInbound): navigate_page() -- Test that it returns a valid ClientShowPageRequest. """ user = oi.db_new_session().query(DbUser).first() - res = await oi.navigate_page(org.NavigatePageRequest( + res = await organizer.navigate_page_impl(oi, org.NavigatePageRequest( ses=org.UserSessionData(sid="test_sid",user=clap.UserInfo(id=user.id, name=user.name), cookies={}) )) assert isinstance(res, org.ClientShowPageRequest) @@ -148,7 +148,7 @@ async def org_test__move_to_folder(oi: organizer.OrganizerInbound): assert someone_elses_video.user_id oi.log.info(f"Trying to move someone else's ({someone_elses_video.user_id}) video ({someone_elses_video.id}) to the root folder ({root_fld.id}) of current user ({user_id})") try: - await oi.move_to_folder(org.MoveToFolderRequest( + await organizer.move_to_folder_impl(oi, org.MoveToFolderRequest( ses, ids=[clap.FolderItemId(video_id=someone_elses_video.id)], dst_folder_id=str(root_fld.id), diff --git a/organizer/basic_folders/organizer/user_session_methods.py b/organizer/basic_folders/organizer/user_session_methods.py index e8b5797..d3719dc 100644 --- a/organizer/basic_folders/organizer/user_session_methods.py +++ b/organizer/basic_folders/organizer/user_session_methods.py @@ -17,7 +17,7 @@ import organizer -async def on_start_user_session(oi: organizer.OrganizerInbound, req: org.OnStartUserSessionRequest) -> org.OnStartUserSessionResponse: +async def on_start_user_session_impl(oi: organizer.OrganizerInbound, req: org.OnStartUserSessionRequest) -> org.OnStartUserSessionResponse: """ Organizer method (gRPC/protobuf) @@ -32,7 +32,7 @@ async def on_start_user_session(oi: organizer.OrganizerInbound, req: org.OnStart return org.OnStartUserSessionResponse() -async def navigate_page(oi: organizer.OrganizerInbound, req: org.NavigatePageRequest) -> org.ClientShowPageRequest: +async def navigate_page_impl(oi: organizer.OrganizerInbound, req: org.NavigatePageRequest) -> org.ClientShowPageRequest: """ Organizer method (gRPC/protobuf) @@ -46,7 +46,7 @@ async def navigate_page(oi: organizer.OrganizerInbound, req: org.NavigatePageReq return await oi.pages_helper.construct_navi_page(ses, None) -async def cmd_from_client(oi: organizer.OrganizerInbound, cmd: org.CmdFromClientRequest) -> clap.Empty: +async def cmd_from_client_impl(oi: organizer.OrganizerInbound, cmd: org.CmdFromClientRequest) -> clap.Empty: """ Organizer method (gRPC/protobuf) @@ -63,17 +63,17 @@ async def cmd_from_client(oi: organizer.OrganizerInbound, cmd: org.CmdFromClient if cmd.cmd == "new_folder": args = parse_json_args(cmd.args) parent_folder = (await oi.folders_helper.get_current_folder_path(cmd.ses, None))[-1] - with oi.db_new_session() as dbs: - # Create folder & refresh user's view - args = parse_json_args(cmd.args) - if new_folder_name := args.get("name"): + # Create folder & refresh user's view + args = parse_json_args(cmd.args) + if new_folder_name := args.get("name"): + with oi.db_new_session() as dbs: new_fld = await oi.folders_helper.create_folder(dbs, cmd.ses, parent_folder, new_folder_name) - oi.log.debug(f"Folder {new_fld.id} ('{new_fld.title}') created & committed, refreshing client's page") - navi_page = await oi.pages_helper.construct_navi_page(cmd.ses, None) - await oi.srv.client_show_page(navi_page) - else: - oi.log.error("new_folder command missing 'name' argument") - raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "new_folder command missing 'name' argument") + oi.log.debug(f"Folder {new_fld.id} ('{new_fld.title}') created & committed, refreshing client's page") + navi_page = await oi.pages_helper.construct_navi_page(cmd.ses, None) + await oi.srv.client_show_page(navi_page) + else: + oi.log.error("new_folder command missing 'name' argument") + raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "new_folder command missing 'name' argument") elif cmd.cmd == "open_folder": # Validate & parse argument JSON diff --git a/protobuf/proto/organizer.proto b/protobuf/proto/organizer.proto index f85e325..4627b16 100644 --- a/protobuf/proto/organizer.proto +++ b/protobuf/proto/organizer.proto @@ -50,12 +50,22 @@ service OrganizerOutbound { // Calls that Clapshot server makes to Organizer service OrganizerInbound { + + // Initialization calls rpc handshake(ServerInfo) returns (Empty); rpc check_migrations(CheckMigrationsRequest) returns (CheckMigrationsResponse); // This is called on startup, after handshake rpc apply_migration(ApplyMigrationRequest) returns (ApplyMigrationResponse); // Called if check_migrations returns any pending migrations rpc after_migrations(AfterMigrationsRequest) returns (Empty); // Called after all migrations to all modules have been applied + // Organizer should respond to all calls as quickly as possible. + // Two standard gRPC error returns status codes have special meaning: + // - `UNIMPLEMENTED` = Organizer does not support this call. The server will assume the default behavior. + // - `ABORTED` = Call failed, but server should not show it to the user (e.g. Organizer handled reporting). + // + // All other gRPC errors will be considered more serious, and the server will log them in detail and possibly show to the user. + + // User session events rpc on_start_user_session(OnStartUserSessionRequest) returns (OnStartUserSessionResponse); rpc navigate_page(NavigatePageRequest) returns (ClientShowPageRequest); rpc authz_user_action(AuthzUserActionRequest) returns (AuthzResponse); @@ -65,7 +75,7 @@ service OrganizerInbound { rpc move_to_folder(MoveToFolderRequest) returns (Empty); rpc reorder_items(ReorderItemsRequest) returns (Empty); - // Unit / integration tests + // Unit / integration tests (not called in production) rpc list_tests(Empty) returns (ListTestsResponse); rpc run_test(RunTestRequest) returns (RunTestResponse); } @@ -75,8 +85,6 @@ service OrganizerInbound { message ServerInfo { message Database { - // Use direct database connection only as a last resort hack. - // gRPC API methods are more future-proof and robust. enum DatabaseType { SQLITE = 0; } diff --git a/server/Cargo.lock b/server/Cargo.lock index 171fe30..7fd9c43 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -534,6 +534,7 @@ dependencies = [ "prost", "r2d2", "rand", + "regex", "reqwest", "rust_decimal", "semver", diff --git a/server/Cargo.toml b/server/Cargo.toml index e3db1eb..bbbb6eb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -54,6 +54,7 @@ lib-clapshot-grpc = { path = "../protobuf/libs/rust" } crossbeam-channel = "0.5.8" docopt = "1.1.1" log = "0.4.17" +regex = "1.10.4" signal-hook = "0.3.15" tracing = "0.1.38" tracing-subscriber = {version = "0.3.17", features = ["env-filter", "json", "fmt", "std", "time", "local-time"] } diff --git a/server/src/api_server/user_session.rs b/server/src/api_server/user_session.rs index 01e314d..9e0bccb 100644 --- a/server/src/api_server/user_session.rs +++ b/server/src/api_server/user_session.rs @@ -167,6 +167,9 @@ pub async fn org_authz<'a>( if e.code() == tonic::Code::Unimplemented { tracing::debug!(desc, user=user_id, "Organizer doesn't support authz"); None + } else if e.code() == tonic::Code::Aborted { + tracing::warn!(desc, user=user_id, "Organizer gRPC.ABORTED authz request. Unsupported behavior for authz_user_action. Denying by default."); + Some(false) } else { error!(desc, user=&user_id, err=?e, "Error while authorizing user action"); try_send_error(&user_id, &server, format!("Internal error in authz: {}", desc), None, &op).ok(); diff --git a/server/src/api_server/ws_handers.rs b/server/src/api_server/ws_handers.rs index 2c1762c..fca0cd8 100644 --- a/server/src/api_server/ws_handers.rs +++ b/server/src/api_server/ws_handers.rs @@ -73,9 +73,11 @@ pub async fn msg_list_my_videos(data: &ListMyVideos , ses: &mut UserSession, ser Err(e) => { if e.code() == tonic::Code::Unimplemented { tracing::debug!("Organizer doesn't implement navigate_page(). Using default."); + } else if e.code() == tonic::Code::Aborted { + tracing::debug!("Ignoring org.navigate_page() result because it GrpcStatus.ABORTED."); } else { tracing::error!(err=?e, "Error in organizer navigate_page() call"); - anyhow::bail!("Error in navigate_page() organizer call: {:?}", e); + anyhow::bail!("Organizer error: {:?}", e); } }, Ok(res) => { @@ -124,7 +126,8 @@ pub async fn msg_open_video(data: &OpenVideo, ses: &mut UserSession, server: &Se pub async fn send_open_video_cmd(server: &ServerState, session_id: &str, video_id: &str) -> Res<()> { server.link_session_to_video(session_id, video_id)?; - let v = models::Video::get(&mut server.db.conn()?, &video_id.into())?.to_proto3(&server.url_base); + let conn = &mut server.db.conn()?; + let v = models::Video::get(conn, &video_id.into())?.to_proto3(&server.url_base); if v.playback_url.is_none() { return Err(anyhow!("No video file")); } @@ -132,7 +135,7 @@ pub async fn send_open_video_cmd(server: &ServerState, session_id: &str, video_i client_cmd!(OpenVideo, {video: Some(v)}), super::SendTo::UserSession(session_id))?; let mut cmts = vec![]; - for mut c in models::Comment::get_by_video(&mut server.db.conn()?, video_id, DBPaging::default())? { + for mut c in models::Comment::get_by_video(conn, video_id, DBPaging::default())? { server.fetch_drawing_data_into_comment(&mut c).await?; cmts.push(c.to_proto3()); } @@ -332,7 +335,8 @@ pub async fn msg_edit_comment(data: &EditComment, ses: &mut UserSession, server: pub async fn msg_del_comment(data: &DelComment, ses: &mut UserSession, server: &ServerState) -> Res<()> { let id = i32::from_str(&data.comment_id)?; - match models::Comment::get(&mut server.db.conn()?, &id) { + let conn = &mut server.db.conn()?; + match models::Comment::get(conn, &id) { Ok(cmt) => { let default_perm = Some(&ses.user_id) == cmt.user_id.as_ref() || ses.is_admin; org_authz_with_default(&ses.org_session, "delete comment", true, server, &ses.organizer, @@ -343,12 +347,12 @@ pub async fn msg_del_comment(data: &DelComment, ses: &mut UserSession, server: & send_user_error!(&ses.user_id, server, Topic::Video(&vid), "Failed to delete comment.", "You can only delete your own comments", true); return Ok(()); } - let all_comm = models::Comment::get_by_video(&mut server.db.conn()?, &vid, DBPaging::default())?; + let all_comm = models::Comment::get_by_video(conn, &vid, DBPaging::default())?; if all_comm.iter().any(|c| c.parent_id.map(|i| i.to_string()) == Some(id.to_string())) { send_user_error!(&ses.user_id, server, Topic::Video(&vid), "Failed to delete comment.", "Comment has replies. Cannot delete.", true); return Ok(()); } - models::Comment::delete(&mut server.db.conn()?, &id)?; + models::Comment::delete(conn, &id)?; server.emit_cmd( client_cmd!(DelComment, {comment_id: id.to_string()}), super::SendTo::VideoId(&vid))?; @@ -461,9 +465,11 @@ pub async fn msg_move_to_folder(data: &proto::client::client_to_server_cmd::Move if let Err(e) = org.lock().await.move_to_folder(req).await { if e.code() == tonic::Code::Unimplemented { tracing::debug!("Organizer doesn't implement move_to_folder(). Ignoring."); + } else if e.code() == tonic::Code::Aborted { + tracing::debug!("Ignoring org.move_to_folder() result because it GrpcStatus.ABORTED."); } else { tracing::error!(err=?e, "Error in organizer move_to_folder() call"); - anyhow::bail!("Error in move_to_folder() organizer call: {:?}", e); + anyhow::bail!("Organizer error: {:?}", e); } } } else { send_user_error!(&ses.user_id, server, Topic::None, "No organizer session."); } @@ -480,9 +486,11 @@ pub async fn msg_reorder_items(data: &ReorderItems, ses: &mut UserSession, serve if let Err(e) = org.lock().await.reorder_items(req).await { if e.code() == tonic::Code::Unimplemented { tracing::debug!("Organizer doesn't implement reorder_items(). Ignoring."); + } else if e.code() == tonic::Code::Aborted { + tracing::debug!("Ignoring org.reorder_items() result because it GrpcStatus.ABORTED."); } else { tracing::error!(err=?e, "Error in organizer reorder_items() call"); - anyhow::bail!("Error in reorder_items() organizer call: {:?}", e); + anyhow::bail!("Organizer error: {:?}", e); } } } else { send_user_error!(&ses.user_id, server, Topic::None, "No organizer session."); } @@ -499,8 +507,12 @@ pub async fn msg_organizer_cmd(data: &proto::client::client_to_server_cmd::Organ }; match org.lock().await.cmd_from_client(req).await { Err(e) => { - tracing::error!(err=?e, "Error in organizer navigate_page() call"); - anyhow::bail!("Error in cmd_from_client() organizer call: {:?}", e); + if e.code() == tonic::Code::Aborted { + tracing::debug!("Ignoring org.cmd_from_client() result because it GrpcStatus.ABORTED."); + } else { + tracing::error!(err=?e, "Error in organizer cmd_from_client() call"); + anyhow::bail!("Organizer error: {:?}", e); + } }, Ok(res) => { return Ok(()); } } @@ -549,9 +561,11 @@ pub async fn msg_dispatch(req: &ClientToServerCmd, ses: &mut UserSession, server if let Err(e) = res { // Ignore authz errors, they are already logged if let None = e.downcast_ref::() { - let cmd_name = req.cmd.as_ref().map(|c| format!("{:?}", c)).unwrap_or_default(); - tracing::warn!("[{}] '{cmd_name}' failed: {}", ses.sid, e); - send_user_error!(&ses.user_id, server, Topic::None, format!("{cmd_name} failed: {e}")); + let cmd_str = req.cmd.as_ref().map(|c| format!("{:?}", c)).unwrap_or_default(); + tracing::warn!("[{}] '{cmd_str}' failed: {}", ses.sid, e); + // Assume name is regex '^[a-zA-Z0-9_]+' of cmd_str + let cmd_name = regex::Regex::new(r"^[a-zA-Z0-9_]+").unwrap().find(&cmd_str).map(|m| m.as_str()).unwrap_or(cmd_str.as_str()); + send_user_error!(&ses.user_id, server, Topic::None, format!("{cmd_str} failed: {e}")); } } Ok(true)