Skip to content

Commit

Permalink
Improve DB and error handling for organizer
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed May 14, 2024
1 parent b9c4c87 commit fdc9466
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 109 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ Clapshot now includes an extensible [Organizer Plugin system](doc/organizer-plug

Organizers use gRPC to communicate with the Clapshot Server, and can be implemented in any language.

The provided default/example organizer, called "basic_folders" (in *Python*), implements
- personal folder UI for users, and
- for admin, list of users and a way to manage their folder contents.

### Work In Progress

The [Organizer API](protobuf/proto/organizer.proto) is still evolving, so you are invited to **provide feedback** and discuss the future development, but please **do not expect backwards compatibility** for now.
Expand Down
6 changes: 5 additions & 1 deletion doc/sysadmin-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ Running the server without migrations enabled will detect that the database is o
Clapshot server itself contains no authentication code. Instead, it trusts
HTTP server (reverse proxy) to take care of that and to pass authenticated user ID
and username in request headers. This is exactly what the basic auth / htadmin demo
above does, too.
above does, too:

- `X-Remote-User-Id` / `X_Remote_User_Id` / `HTTP_X_REMOTE_USER_ID` – Authenticated user's ID (e.g. "alice.brown")
- `X-Remote-User-Name` / `X_Remote_User_Name` / `HTTP_X_REMOTE_USER_NAME` – Display name for user (e.g. "Alice Brown")
- `X-Remote-User-Is-Admin` / `X_Remote_User_Is_Admin` / `HTTP_X_REMOTE_USER_IS_ADMIN` – If set to "1" or "true", user is a Clapshot admin

Most modern real-world deployments will likely use some more advanced authentication mechanism, such as OAuth, Kerberos etc, but htadmin is a good starting point.

Expand Down
68 changes: 54 additions & 14 deletions organizer/basic_folders/organizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import clapshot_grpc.clapshot as clap
import clapshot_grpc.clapshot.organizer as org
import sqlalchemy
import sqlalchemy.exc

from functools import wraps

from organizer.database.connection import open_database

from .migration_methods import check_migrations, apply_migration, after_migrations
from .user_session_methods import connect_back_to_server, on_start_user_session, navigate_page, cmd_from_client
from .folder_op_methods import move_to_folder, reorder_items
from .testing_methods import list_tests, run_test
from .migration_methods import check_migrations_impl, apply_migration_impl, after_migrations_impl
from .user_session_methods import connect_back_to_server, on_start_user_session_impl, navigate_page_impl, cmd_from_client_impl
from .folder_op_methods import move_to_folder_impl, reorder_items_impl
from .testing_methods import list_tests_impl, run_test_impl

from .helpers.folders import FoldersHelper
from .helpers.pages import PagesHelper
Expand All @@ -27,6 +30,32 @@ def override(func): # type: ignore
return func




def organizer_grpc_handler(func):
@wraps(func)
async def wrapper(self, request):
try:
try:
return await func(self, request)
except sqlalchemy.exc.OperationalError as e:
raise GRPCError(GrpcStatus.RESOURCE_EXHAUSTED, f"DB error: {e}")
except GRPCError as e:
# Intercept some known session errors and show them to the user nicely
if e.status in (GrpcStatus.INVALID_ARGUMENT, GrpcStatus.PERMISSION_DENIED, GrpcStatus.ALREADY_EXISTS, GrpcStatus.RESOURCE_EXHAUSTED):
await self.srv.client_show_user_message(org.ClientShowUserMessageRequest(sid=request.ses.sid,
msg = clap.UserMessage(
message=str(e.message),
user_id=request.ses.user.id,
type=clap.UserMessageType.ERROR,
details=str(e.details) if e.details else None)))
raise GRPCError(GrpcStatus.ABORTED) # Tell Clapshot server to ignore the result (we've shown the error to the user)
else:
raise e
return wrapper



class OrganizerInbound(org.OrganizerInboundBase):
srv: org.OrganizerOutboundStub # connection back to Clapshot server
log: Logger
Expand Down Expand Up @@ -60,54 +89,65 @@ async def handshake(self, server_info: org.ServerInfo) -> clap.Empty:
# Migration methods

@override
@organizer_grpc_handler
async def check_migrations(self, request: org.CheckMigrationsRequest) -> org.CheckMigrationsResponse:
return await check_migrations(self, request)
return await check_migrations_impl(self, request)

@override
@organizer_grpc_handler
async def apply_migration(self, request: org.ApplyMigrationRequest) -> org.ApplyMigrationResponse:
return await apply_migration(self, request)
return await apply_migration_impl(self, request)

@override
@organizer_grpc_handler
async def after_migrations(self, request: org.AfterMigrationsRequest) -> clap.Empty:
return await after_migrations(self, request)
return await after_migrations_impl(self, request)


# User session methods

@override
@organizer_grpc_handler
async def on_start_user_session(self, request: org.OnStartUserSessionRequest) -> org.OnStartUserSessionResponse:
return await on_start_user_session(self, request)
return await on_start_user_session_impl(self, request)

@override
@organizer_grpc_handler
async def navigate_page(self, request: org.NavigatePageRequest) -> org.ClientShowPageRequest:
return await navigate_page(self, request)
return await navigate_page_impl(self, request)

@override
@organizer_grpc_handler
async def cmd_from_client(self, request: org.CmdFromClientRequest) -> clap.Empty:
return await cmd_from_client(self, request)
return await cmd_from_client_impl(self, request)

@override
@organizer_grpc_handler
async def authz_user_action(self, request: org.AuthzUserActionRequest) -> org.AuthzResponse:
raise GRPCError(GrpcStatus.UNIMPLEMENTED) # = let Clapshot server decide


# Folder operation methods

@override
@organizer_grpc_handler
async def move_to_folder(self, request: org.MoveToFolderRequest) -> clap.Empty:
return await move_to_folder(self, request)
return await move_to_folder_impl(self, request)

@override
@organizer_grpc_handler
async def reorder_items(self, request: org.ReorderItemsRequest) -> clap.Empty:
return await reorder_items(self, request)
return await reorder_items_impl(self, request)


# Testing methods

@override
@organizer_grpc_handler
async def list_tests(self, request: clap.Empty) -> org.ListTestsResponse:
return await list_tests(self)
return await list_tests_impl(self)

@override
@organizer_grpc_handler
async def run_test(self, request: org.RunTestRequest) -> org.RunTestResponse:
return await run_test(self, request)
return await run_test_impl(self, request)
118 changes: 61 additions & 57 deletions organizer/basic_folders/organizer/folder_op_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import organizer

async def move_to_folder(oi: organizer.OrganizerInbound, req: org.MoveToFolderRequest) -> clap.Empty:
async def move_to_folder_impl(oi: organizer.OrganizerInbound, req: org.MoveToFolderRequest) -> clap.Empty:
"""
Organizer method (gRPC/protobuf)
Expand All @@ -25,45 +25,47 @@ async def move_to_folder(oi: organizer.OrganizerInbound, req: org.MoveToFolderRe
return clap.Empty()

with oi.db_new_session() as dbs:
with dbs.begin_nested():
dst_folder = dbs.query(DbFolder).filter(DbFolder.id == int(req.dst_folder_id)).one_or_none()
max_sort_order = dbs.query(sqlalchemy.func.max(DbFolderItems.sort_order)).filter(DbFolderItems.folder_id == int(req.dst_folder_id)).scalar() or 0

if not dst_folder:
raise GRPCError(GrpcStatus.NOT_FOUND, "Destination folder not found")
if dst_folder.user_id != req.ses.user.id and not req.ses.is_admin:
raise GRPCError(GrpcStatus.PERMISSION_DENIED, "Cannot move items to another user's folder")

for it in req.ids:
# Move a folder
if it.folder_id:
fld_to_move: Optional[DbFolder] = dbs.query(DbFolder).filter(DbFolder.id == int(it.folder_id)).one_or_none()

if not fld_to_move:
raise GRPCError(GrpcStatus.NOT_FOUND, f"Folder id '{it.folder_id}' not found")
if fld_to_move.id == dst_folder.id:
raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "Cannot move a folder into itself")
if fld_to_move.user_id != req.ses.user.id and not req.ses.is_admin:
raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's folder")
dst_folder = dbs.query(DbFolder).filter(DbFolder.id == int(req.dst_folder_id)).one_or_none()
max_sort_order = dbs.query(sqlalchemy.func.max(DbFolderItems.sort_order)).filter(DbFolderItems.folder_id == int(req.dst_folder_id)).scalar() or 0

if not dst_folder:
raise GRPCError(GrpcStatus.NOT_FOUND, "Destination folder not found")
if dst_folder.user_id != req.ses.user.id and not req.ses.is_admin:
raise GRPCError(GrpcStatus.PERMISSION_DENIED, "Cannot move items to another user's folder")

for it in req.ids:
with oi.db_new_session() as dbs:
# Move a folder
if it.folder_id:
fld_to_move: Optional[DbFolder] = dbs.query(DbFolder).filter(DbFolder.id == int(it.folder_id)).one_or_none()

if not fld_to_move:
raise GRPCError(GrpcStatus.NOT_FOUND, f"Folder id '{it.folder_id}' not found")
if fld_to_move.id == dst_folder.id:
raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "Cannot move a folder into itself")
if fld_to_move.user_id != req.ses.user.id and not req.ses.is_admin:
raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's folder")

with dbs.begin_nested():
cnt = dbs.query(DbFolderItems).filter(DbFolderItems.subfolder_id == fld_to_move.id).update({"folder_id": dst_folder.id, "sort_order": max_sort_order+1})
if cnt == 0:
raise GRPCError(GrpcStatus.NOT_FOUND, f"Folder with ID '{fld_to_move.id}' is a root folder? Cannot move.")

assert dst_folder.user_id, "Destination folder has no user ID, cannot transfer ownership"

await _recursive_set_folder_owner(dbs, fld_to_move.id, dst_folder.user_id, set(), oi.log)

oi.log.debug(f"Moved folder '{fld_to_move.id}' to folder '{dst_folder.id}'")
oi.log.debug(f"Moved folder '{fld_to_move.id}' to folder '{dst_folder.id}'")

# Move a video
elif it.video_id:
vid_to_move = dbs.query(DbVideo).filter(DbVideo.id == it.video_id).one_or_none()
# Move a video
elif it.video_id:
vid_to_move = dbs.query(DbVideo).filter(DbVideo.id == it.video_id).one_or_none()

if not vid_to_move:
raise GRPCError(GrpcStatus.NOT_FOUND, f"Video '{it.video_id}' not found")
if vid_to_move.user_id != req.ses.user.id and not req.ses.is_admin:
raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's video")
if not vid_to_move:
raise GRPCError(GrpcStatus.NOT_FOUND, f"Video '{it.video_id}' not found")
if vid_to_move.user_id != req.ses.user.id and not req.ses.is_admin:
raise GRPCError(GrpcStatus.PERMISSION_DENIED, f"Cannot move another user's video")

with dbs.begin_nested():
vid_to_move.user_id = dst_folder.user_id # transfer ownership
cnt = dbs.query(DbFolderItems).filter(DbFolderItems.video_id == vid_to_move.id).update({"folder_id": dst_folder.id, "sort_order": max_sort_order+1})
if cnt == 0: # not in any folder yet => insert it
Expand All @@ -77,7 +79,35 @@ async def move_to_folder(oi: organizer.OrganizerInbound, req: org.MoveToFolderRe
return clap.Empty()


async def reorder_items(oi: organizer.OrganizerInbound, req: org.ReorderItemsRequest) -> clap.Empty:
async def _recursive_set_folder_owner(dbs: sqlalchemy.orm.Session, folder_id: int, new_owner_id: str, seen: set[int], log: Logger) -> None:
"""
Set the owner of a folder and all its subfolders + videos recursively.
"""
assert isinstance(folder_id, int), f"Unexpected subfolder ID type on: {folder_id} ({type(folder_id)})"

if folder_id in seen:
log.warning(f"Folder loop detected! THIS SHOULD NOT HAPPEN. Skipping folder '{folder_id}'")
return
seen.add(folder_id)

# Update folder itself
log.debug(f"Setting owner of folder '{folder_id}' to '{new_owner_id}'")
dbs.query(DbFolder).filter(DbFolder.id == folder_id).update({"user_id": new_owner_id})

# Update videos in this folder
log.debug(f"Setting owner of folder '{folder_id}' videos to '{new_owner_id}'")
videos_subq = dbs.query(DbFolderItems.video_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.video_id != None).subquery()
dbs.query(DbVideo).filter(DbVideo.id.in_(sqlalchemy.select(videos_subq))).update({"user_id": new_owner_id})

# Update subfolders
sub_ids = dbs.query(DbFolderItems.subfolder_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.subfolder_id != None).all()
for subi in sub_ids:
log.debug(f"Recursing to subfolder '{subi[0]}'")
await _recursive_set_folder_owner(dbs, subi[0], new_owner_id, seen, log)



async def reorder_items_impl(oi: organizer.OrganizerInbound, req: org.ReorderItemsRequest) -> clap.Empty:
"""
Organizer (gRPC/protobuf)
Called when user reorders items in a folder in the client UI.
Expand Down Expand Up @@ -113,29 +143,3 @@ async def reorder_items(oi: organizer.OrganizerInbound, req: org.ReorderItemsReq
else:
raise GRPCError(GrpcStatus.INVALID_ARGUMENT, "No folder ID in UI listing, cannot reorder")


async def _recursive_set_folder_owner(dbs: sqlalchemy.orm.Session, folder_id: int, new_owner_id: str, seen: set[int], log: Logger) -> None:
"""
Set the owner of a folder and all its subfolders + videos recursively.
"""
assert isinstance(folder_id, int), f"Unexpected subfolder ID type on: {folder_id} ({type(folder_id)})"

if folder_id in seen:
log.warning(f"Folder loop detected! THIS SHOULD NOT HAPPEN. Skipping folder '{folder_id}'")
return
seen.add(folder_id)

# Update folder itself
log.debug(f"Setting owner of folder '{folder_id}' to '{new_owner_id}'")
dbs.query(DbFolder).filter(DbFolder.id == folder_id).update({"user_id": new_owner_id})

# Update videos in this folder
log.debug(f"Setting owner of folder '{folder_id}' videos to '{new_owner_id}'")
videos_subq = dbs.query(DbFolderItems.video_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.video_id != None).subquery()
dbs.query(DbVideo).filter(DbVideo.id.in_(sqlalchemy.select(videos_subq))).update({"user_id": new_owner_id})

# Update subfolders
sub_ids = dbs.query(DbFolderItems.subfolder_id).filter(DbFolderItems.folder_id == folder_id, DbFolderItems.subfolder_id != None).all()
for subi in sub_ids:
log.debug(f"Recursing to subfolder '{subi[0]}'")
await _recursive_set_folder_owner(dbs, subi[0], new_owner_id, seen, log)
6 changes: 3 additions & 3 deletions organizer/basic_folders/organizer/migration_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import organizer


async def check_migrations(oi, req: org.CheckMigrationsRequest) -> org.CheckMigrationsResponse:
async def check_migrations_impl(oi, req: org.CheckMigrationsRequest) -> org.CheckMigrationsResponse:
"""
Organizer method (gRPC/protobuf)
Expand All @@ -22,7 +22,7 @@ async def check_migrations(oi, req: org.CheckMigrationsRequest) -> org.CheckMigr
return org.CheckMigrationsResponse(current_schema_ver=cur_ver,pending_migrations=pending)


async def apply_migration(oi: organizer.OrganizerInbound, req: org.ApplyMigrationRequest) -> org.ApplyMigrationResponse:
async def apply_migration_impl(oi: organizer.OrganizerInbound, req: org.ApplyMigrationRequest) -> org.ApplyMigrationResponse:
"""
Organizer method (gRPC/protobuf)
Expand All @@ -35,7 +35,7 @@ async def apply_migration(oi: organizer.OrganizerInbound, req: org.ApplyMigratio
return org.ApplyMigrationResponse()


async def after_migrations(oi: organizer.OrganizerInbound, _: org.AfterMigrationsRequest) -> clap.Empty:
async def after_migrations_impl(oi: organizer.OrganizerInbound, _: org.AfterMigrationsRequest) -> clap.Empty:
"""
Organizer method (gRPC/protobuf)
Expand Down
10 changes: 5 additions & 5 deletions organizer/basic_folders/organizer/testing_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from organizer.helpers.folders import FoldersHelper


async def list_tests(oi: organizer.OrganizerInbound) -> org.ListTestsResponse:
async def list_tests_impl(oi: organizer.OrganizerInbound) -> org.ListTestsResponse:
"""
Organizer method (gRPC/protobuf)
Expand All @@ -37,7 +37,7 @@ async def list_tests(oi: organizer.OrganizerInbound) -> org.ListTestsResponse:
return org.ListTestsResponse(test_names=test_names)


async def run_test(oi, request: org.RunTestRequest) -> org.RunTestResponse:
async def run_test_impl(oi, request: org.RunTestRequest) -> org.RunTestResponse:
"""
Organizer method (gRPC/protobuf)
Expand Down Expand Up @@ -91,7 +91,7 @@ async def org_test__start_user_session(oi: organizer.OrganizerInbound):
on_start_user_session() -- Just a simple test to check if the method doesn't crash.
"""
user = oi.db_new_session().query(DbUser).first()
res = await oi.on_start_user_session(org.OnStartUserSessionRequest(
res = await organizer.on_start_user_session_impl(oi, org.OnStartUserSessionRequest(
org.UserSessionData(
sid="test_sid",user=clap.UserInfo(id=user.id, name=user.name),
is_admin=False, cookies={})
Expand All @@ -104,7 +104,7 @@ async def org_test__navigate_page(oi: organizer.OrganizerInbound):
navigate_page() -- Test that it returns a valid ClientShowPageRequest.
"""
user = oi.db_new_session().query(DbUser).first()
res = await oi.navigate_page(org.NavigatePageRequest(
res = await organizer.navigate_page_impl(oi, org.NavigatePageRequest(
ses=org.UserSessionData(sid="test_sid",user=clap.UserInfo(id=user.id, name=user.name), cookies={})
))
assert isinstance(res, org.ClientShowPageRequest)
Expand Down Expand Up @@ -148,7 +148,7 @@ async def org_test__move_to_folder(oi: organizer.OrganizerInbound):
assert someone_elses_video.user_id
oi.log.info(f"Trying to move someone else's ({someone_elses_video.user_id}) video ({someone_elses_video.id}) to the root folder ({root_fld.id}) of current user ({user_id})")
try:
await oi.move_to_folder(org.MoveToFolderRequest(
await organizer.move_to_folder_impl(oi, org.MoveToFolderRequest(
ses,
ids=[clap.FolderItemId(video_id=someone_elses_video.id)],
dst_folder_id=str(root_fld.id),
Expand Down
Loading

0 comments on commit fdc9466

Please sign in to comment.