From 3455e76be7c63d7c47a5a05781c1ef7ea5a8bda1 Mon Sep 17 00:00:00 2001 From: Madhukar N Date: Mon, 6 Jan 2025 07:26:00 -0800 Subject: [PATCH] feat(google-drive): add scope validation for Google Drive API credentials (#667) --- .../langchain_google_community/drive.py | 43 ++++++++++++++++--- libs/community/tests/unit_tests/test_drive.py | 39 +++++++++++++++++ 2 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 libs/community/tests/unit_tests/test_drive.py diff --git a/libs/community/langchain_google_community/drive.py b/libs/community/langchain_google_community/drive.py index 5f5a3fd4..035356a4 100644 --- a/libs/community/langchain_google_community/drive.py +++ b/libs/community/langchain_google_community/drive.py @@ -9,18 +9,26 @@ import os from pathlib import Path -from typing import Any, Dict, List, Optional, Sequence, Union +from typing import Any, ClassVar, Dict, List, Optional, Sequence, Tuple, Union from langchain_core.document_loaders import BaseLoader from langchain_core.documents import Document from pydantic import BaseModel, field_validator, model_validator -SCOPES = ["https://www.googleapis.com/auth/drive.file"] - class GoogleDriveLoader(BaseLoader, BaseModel): """Load Google Docs from `Google Drive`.""" + # Generated from https://developers.google.com/drive/api/guides/api-specific-auth + # limiting to the scopes that are required to read the files + VALID_SCOPES: ClassVar[Tuple[str, ...]] = ( + "https://www.googleapis.com/auth/drive.file", + "https://www.googleapis.com/auth/drive.readonly", + "https://www.googleapis.com/auth/drive.meet.readonly", + "https://www.googleapis.com/auth/drive.metadata.readonly", + "https://www.googleapis.com/auth/drive.metadata", + ) + service_account_key: Path = Path.home() / ".credentials" / "keys.json" """Path to the service account key file.""" credentials_path: Path = Path.home() / ".credentials" / "credentials.json" @@ -51,6 +59,9 @@ class GoogleDriveLoader(BaseLoader, BaseModel): """Whether to load authorization identities.""" load_extended_metadata: bool = False """Whether to load extended metadata.""" + scopes: List[str] = ["https://www.googleapis.com/auth/drive.file"] + """The credential scopes to use for Google Drive API access. Default is + drive.file scope.""" def _get_file_size_from_id(self, id: str) -> str: """Fetch the size of the file.""" @@ -252,6 +263,22 @@ def validate_credentials_path(cls, v: Any, **kwargs: Any) -> Any: raise ValueError(f"credentials_path {v} does not exist") return v + @field_validator("scopes") + def validate_scopes(cls, v: List[str]) -> List[str]: + """Validate that the provided scopes are not empty and + are valid Google Drive API scopes.""" + if not v: + raise ValueError("At least one scope must be provided") + + invalid_scopes = [scope for scope in v if scope not in cls.VALID_SCOPES] + if invalid_scopes: + raise ValueError( + f"Invalid Google Drive API scope(s): {', '.join(invalid_scopes)}. " + f"Valid scopes are: {', '.join(cls.VALID_SCOPES)}" + ) + + return v + def _load_credentials(self) -> Any: """Load credentials.""" # Adapted from https://developers.google.com/drive/api/v3/quickstart/python @@ -273,11 +300,13 @@ def _load_credentials(self) -> Any: creds = None if self.service_account_key.exists(): return service_account.Credentials.from_service_account_file( - str(self.service_account_key), scopes=SCOPES + str(self.service_account_key), scopes=self.scopes ) if self.token_path.exists(): - creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES) + creds = Credentials.from_authorized_user_file( + str(self.token_path), self.scopes + ) if self.credentials: # use whatever was passed to us @@ -289,13 +318,13 @@ def _load_credentials(self) -> Any: creds.refresh(Request()) elif "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ: creds, project = default() - creds = creds.with_scopes(SCOPES) + creds = creds.with_scopes(self.scopes) # no need to write to file if creds: return creds else: flow = InstalledAppFlow.from_client_secrets_file( - str(self.credentials_path), SCOPES + str(self.credentials_path), self.scopes ) creds = flow.run_local_server(port=0) with open(self.token_path, "w") as token: diff --git a/libs/community/tests/unit_tests/test_drive.py b/libs/community/tests/unit_tests/test_drive.py new file mode 100644 index 00000000..89f4b6f7 --- /dev/null +++ b/libs/community/tests/unit_tests/test_drive.py @@ -0,0 +1,39 @@ +import pytest + +from langchain_google_community.drive import GoogleDriveLoader + + +def test_drive_default_scope() -> None: + """Test that default scope is set correctly.""" + loader = GoogleDriveLoader(folder_id="dummy_folder") + assert loader.scopes == ["https://www.googleapis.com/auth/drive.file"] + + +def test_drive_custom_scope() -> None: + """Test setting custom scope.""" + custom_scopes = ["https://www.googleapis.com/auth/drive.readonly"] + loader = GoogleDriveLoader(folder_id="dummy_folder", scopes=custom_scopes) + assert loader.scopes == custom_scopes + + +def test_drive_multiple_scopes() -> None: + """Test setting multiple valid scopes.""" + custom_scopes = [ + "https://www.googleapis.com/auth/drive.readonly", + "https://www.googleapis.com/auth/drive.metadata.readonly", + ] + loader = GoogleDriveLoader(folder_id="dummy_folder", scopes=custom_scopes) + assert loader.scopes == custom_scopes + + +def test_drive_empty_scope_list() -> None: + """Test that empty scope list raises error.""" + with pytest.raises(ValueError, match="At least one scope must be provided"): + GoogleDriveLoader(folder_id="dummy_folder", scopes=[]) + + +def test_drive_invalid_scope() -> None: + """Test that invalid scope raises error.""" + invalid_scopes = ["https://www.googleapis.com/auth/drive.invalid"] + with pytest.raises(ValueError, match="Invalid Google Drive API scope"): + GoogleDriveLoader(folder_id="dummy_folder", scopes=invalid_scopes)