diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md new file mode 100644 index 00000000..e2d2f93a --- /dev/null +++ b/.claude/CLAUDE.md @@ -0,0 +1,307 @@ +# Keboola MCP Server - Project Guidelines + +This document contains project-specific coding standards and best practices for the Keboola MCP Server. These guidelines complement the global CLAUDE.md standards and address patterns specific to this codebase. + +## Code Simplicity Standards + +**Avoid Unnecessary Wrapper Functions:** +- Do NOT create helper methods for simple conditional checks +- Direct inline checks are preferred for readability when the condition is simple +- Example: Use `if token_role in ['guest', 'readonly']:` instead of `requires_read_only_access()` +- Only create helper functions when they prevent significant duplication or encapsulate complex logic + +**When to Extract Helpers:** +- When the same logic is duplicated across multiple modules +- When the logic is complex enough that a descriptive name improves readability +- When the helper can be reused in multiple contexts + +**Example - Bad (Over-abstraction):** +```python +@staticmethod +def is_guest_role(token_info: JsonDict) -> bool: + """Check if the token belongs to a guest user.""" + role = ToolsFilteringMiddleware.get_token_role(token_info).lower() + return role == 'guest' + +@staticmethod +def requires_read_only_access(token_info: JsonDict) -> bool: + """Check if the token requires read-only tool access.""" + return ( + ToolsFilteringMiddleware.is_guest_role(token_info) + or ToolsFilteringMiddleware.is_read_only_role(token_info) + ) + +# Usage +if self.requires_read_only_access(token_info): + # filter tools +``` + +**Example - Good (Direct and Clear):** +```python +# Usage - direct inline check +if token_role in ['guest', 'readonly']: + # filter tools +``` + +## Test Structure Standards + +**Test Files Must Mirror Source Structure:** +- `src/keboola_mcp_server/mcp.py` → `tests/test_mcp.py` +- `src/keboola_mcp_server/authorization.py` → `tests/test_authorization.py` +- `src/keboola_mcp_server/clients/client.py` → `tests/test_clients/test_client.py` + +**NO Separate Test Files for Features:** +- Do NOT create `test_role_based_authorization.py` for role authorization in mcp.py +- Do NOT create `test_feature_x.py` for a feature implemented in an existing module +- The test file structure should mirror the source code structure exactly + +**Exception:** +- Only create separate test files if the feature has its own separate source module + +## Test Quality Standards + +**Prefer Integration Tests Over Heavy Mocking:** +- Use real server instances with mocked external dependencies (like test_server.py does) +- Mock only at system boundaries (API responses, token verification, external services) +- Avoid mocking internal implementation details + +**Good Integration Test Pattern (from test_server.py:220-254):** +```python +@pytest.mark.asyncio +@pytest.mark.parametrize( + ('admin_info', 'expected_included', 'expected_excluded'), + [ + ({'role': 'admin'}, 'modify_flow', 'update_flow'), + ({'role': 'guest'}, 'get_buckets', 'create_config'), + ({'role': 'readOnly'}, 'query_data', 'update_descriptions'), + ], +) +async def test_role_based_access(mocker, admin_info, expected_included, expected_excluded): + # Mock only external boundary (token verification) + mocker.patch( + 'keboola_mcp_server.clients.client.AsyncStorageClient.verify_token', + return_value={'admin': admin_info, 'owner': {'features': []}}, + ) + + # Use real server instance + mcp = create_server(Config(), runtime_info=ServerRuntimeInfo(transport='stdio')) + + # Test real behavior + async with Client(mcp) as client: + tools = await client.list_tools() + tool_names = {tool.name for tool in tools} + assert expected_included in tool_names + assert expected_excluded not in tool_names +``` + +**Bad Test Pattern (Heavy Mocking):** +```python +# DON'T mock everything - this tests almost nothing +async def test_feature(middleware, mock_context): + call_next = AsyncMock(return_value=mock_tools) + with patch.object(middleware, 'method1', AsyncMock(return_value=...)): + with patch.object(middleware, 'method2', return_value=...): + # This test validates mocks, not real behavior + result = await middleware.on_list_tools(mock_context, call_next) +``` + +**What to Mock:** +- External API responses (Storage API, Scheduler API) +- Token verification calls +- File system operations +- Network requests +- Time-dependent behavior + +**What NOT to Mock:** +- Internal method calls within the module being tested +- Simple utility functions +- The middleware chain itself (use real FastMCP server) +- Tool filtering logic (test the actual behavior) + +## Logging Standards + +**Log Level Guidelines:** +- **DEBUG**: Verbose/frequent operations that would be noisy in production + - Example: Every tools listing call + - Example: Detailed parameter values during processing +- **INFO**: Significant operational decisions that should always be visible + - Example: Authorization decisions (access granted/denied) + - Example: Configuration changes + - Example: Client initialization +- **WARNING**: Unexpected situations needing attention but not stopping execution + - NOT for normal operations +- **ERROR**: Errors preventing operation completion + +**Example:** +```python +# Good - DEBUG for frequent operations +if token_role in ['guest', 'readonly']: + tools = [t for t in tools if is_read_only_tool(t)] + LOG.debug(f'Read-only access: filtered to {len(tools)} read-only tools for role={token_role}') + +# Bad - INFO would be too noisy (called on every tools listing) +LOG.info(f'Read-only access: filtered to {len(tools)} read-only tools for role={token_role}') +``` + +## Code Deduplication + +**Shared Utility Module:** +- Extract helpers duplicated across modules to `src/keboola_mcp_server/utils.py` +- Document where shared utilities live in module docstrings +- Import shared utilities instead of duplicating code + +**Example:** +```python +# utils.py +def is_read_only_tool(tool: Tool) -> bool: + """Check if a tool has readOnlyHint=True annotation. + + This is used by both ToolsFilteringMiddleware and ToolAuthorizationMiddleware + to determine which tools are read-only. + """ + if tool.annotations is None: + return False + return tool.annotations.readOnlyHint is True + +# mcp.py +from keboola_mcp_server.utils import is_read_only_tool + +# authorization.py +from keboola_mcp_server.utils import is_read_only_tool +``` + +## Documentation Standards + +**Docstrings Must Match Implementation:** +- Review docstrings after implementation changes +- Role matrices, feature lists, and behavior descriptions must be accurate +- Prefer clear descriptions over complex tables + +**Example - Bad (Inaccurate):** +```python +""" +Role-based access control: +- Admin: Full access to all tools +- Guest: Read-only access +- Read: Read-only access +- Other roles: Standard access +""" +# Reality: Admin has some tools blocked, other roles have specific restrictions +``` + +**Example - Good (Accurate):** +```python +""" +Role-based access control: +- Guest: Read-only access (only tools with readOnlyHint=True) +- Read: Read-only access (only tools with readOnlyHint=True) +- Other non-admin roles: Write tools available, with specific tools (e.g., `modify_flow`) explicitly restricted +- Admin: Broad access to tools, with specific write tools (e.g., `update_flow`) explicitly restricted +""" +``` + +## README Standards + +**Role Descriptions:** +- List roles in logical progression (most restrictive first) +- Use consistent terminology ("read-only tools" not "query-only operations") +- Be specific about what each role can/cannot do + +**Example Structure:** +```markdown +#### Role-Based Access Control + +- **Guest**: Read-only access limited to tools marked as read-only (no modifying operations) +- **Read**: Similar to guest, users with read role can only access tools marked as read-only +- **Other non-admin roles**: Standard write access with some administrative tools restricted +- **Admin**: Broad tool access for administrative operations, with specific tools restricted +``` + +## Version Management + +**ALWAYS Bump Version in pyproject.toml:** +- Every PR must increment the version number +- Use semantic versioning: + - Patch (1.42.1 → 1.42.2): Bug fixes, small improvements + - Minor (1.42.1 → 1.43.0): New features, non-breaking changes + - Major (1.42.1 → 2.0.0): Breaking changes +- Update version BEFORE creating the PR + +## Dependency Management + +**Use `uv` instead of `pip`:** +- This project uses `uv` for dependency management, NOT `pip` +- To run commands: `uv run ` +- To run tests: `uv run --extra tests pytest tests/` +- To install dependencies: `uv sync` +- The virtual environment is managed by `uv` and located in `.venv/` +- Never use `pip install` - always use `uv` commands + +**Examples:** +```bash +# Run tests +uv run --extra tests pytest tests/test_mcp.py -v + +# Run specific test group +uv run --extra tests pytest tests/test_server.py -k "role" + +# Run the server +uv run keboola-mcp-server + +# Sync dependencies +uv sync +``` + +## Testing & Quality Checks + +**Run Tox After Python Changes:** +- ALWAYS run `uv run tox` after modifying Python source files, tests, or integration tests +- Tox runs: pytest (932 tests), black (formatting), flake8 (linting), and check-tools-docs +- All checks must pass before committing or creating a PR +- If flake8 fails, fix the issues immediately - don't commit with linting errors +- If black reformats files, those changes will be applied automatically + +**When to Run Tox:** +- After editing any `.py` files in `src/`, `tests/`, or `integtests/` +- Before creating a git commit with Python changes +- Before creating or updating a pull request +- When requested by the user to "run tests" or "check code quality" + +**Tox Commands:** +```bash +# Run all checks (recommended) +uv run tox + +# Run specific environment only +uv run tox -e python # Run tests only +uv run tox -e black # Run black formatting only +uv run tox -e flake8 # Run linting only +uv run tox -e check-tools-docs # Verify TOOLS.md is up to date +``` + +**Common Issues:** +- **Flake8 F841**: Unused variable - remove the variable or prefix with `_` if needed for compatibility +- **Flake8 PT004**: Fixture without return - prefix fixture name with `_` per pytest convention +- **Black reformatting**: Commit the reformatted files - black's changes are always correct +- **Test failures**: Fix the underlying issue before proceeding + +## Review Checklist + +Before submitting a PR, verify: + +- [ ] Tests: All tox checks pass (pytest, black, flake8, check-tools-docs) +- [ ] Code simplicity: No unnecessary wrapper functions for simple conditions +- [ ] Test structure: Tests mirror source file structure (no separate feature test files) +- [ ] Test quality: Integration tests with minimal mocking (only external boundaries) +- [ ] Logging: Appropriate log levels (DEBUG for frequent operations) +- [ ] Deduplication: Shared logic extracted to utils.py +- [ ] Docstrings: Accurate and match actual implementation +- [ ] README: Clear, accurate role descriptions in logical order +- [ ] Version: Incremented in pyproject.toml + +## References + +These guidelines were established based on feedback from: +- PR #350 (Tool Authorization Middleware) +- PR #381 (Role-Based Authorization) +- Code review patterns from the Keboola MCP Server team diff --git a/.gitignore b/.gitignore index 5301134b..7528c61b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,7 +9,8 @@ wheels/ # IDE related files .cursor/ .idea/ -.vscode/ +.vscode +.claude/ # Virtual environments .venv diff --git a/README.md b/README.md index bc540548..323232ce 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,17 @@ X-Read-Only-Mode: true For detailed documentation, see [developers.keboola.com/integrate/mcp/#tool-authorization-and-access-control](https://developers.keboola.com/integrate/mcp/#tool-authorization-and-access-control). +#### Role-Based Access Control + +The MCP Server automatically restricts tool access based on your Keboola token role: + +- **Guest**: Read-only access limited to tools marked as read-only (no modifying operations) +- **Read**: Similar to guest, users with read role can only access tools marked as read-only (no modifying operations) +- **Other non-admin roles**: Standard write access with some administrative tools restricted (e.g., `modify_flow` for scheduling) +- **Admin**: Broad tool access for administrative operations, with specific tools restricted (e.g., `update_flow` is blocked, use `modify_flow` for scheduling) + +The role is determined by your Storage API token and automatically enforced. Admin role specifically allows access to the Scheduler API for managing flow scheduling. + --- ## Local MCP Server Setup (Custom or Dev Way) diff --git a/integtests/conftest.py b/integtests/conftest.py index 10b1ab88..683ce2e5 100644 --- a/integtests/conftest.py +++ b/integtests/conftest.py @@ -237,21 +237,44 @@ def keboola_project(env_init: bool, storage_api_token: str, storage_api_url: str token_info = storage_client.tokens.verify() project_id: str = token_info['owner']['id'] + # Clean up any leftover buckets from interrupted test runs current_buckets = storage_client.buckets.list() if current_buckets: - pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_buckets)} buckets') + LOG.warning( + f'Found {len(current_buckets)} buckets in project {project_id} (expected empty). ' + f'Cleaning up from previous interrupted test run...' + ) + for bucket in current_buckets: + bucket_id = bucket['id'] + LOG.info(f'Deleting existing bucket with ID={bucket_id}') + storage_client.buckets.delete(bucket_id, force=True) buckets = _create_buckets(storage_client) + # Clean up any leftover tables from interrupted test runs current_tables = storage_client.tables.list() if current_tables: - pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_tables)} tables') + LOG.warning( + f'Found {len(current_tables)} tables in project {project_id} (expected empty). ' + f'Tables should have been deleted with buckets.' + ) tables = _create_tables(storage_client) - current_configs = storage_client.configurations.list(component_id='ex-generic-v2') - if current_configs: - pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_configs)} configs') + # Clean up any leftover configs from interrupted test runs + component_ids = ['ex-generic-v2', 'keboola.snowflake-transformation'] + for component_id in component_ids: + current_configs = storage_client.configurations.list(component_id=component_id) + if current_configs: + LOG.warning( + f'Found {len(current_configs)} configs for component {component_id} in project {project_id} ' + f'(expected empty). Cleaning up from previous interrupted test run...' + ) + for config in current_configs: + LOG.info(f'Deleting existing config with component ID={component_id} and config ID={config["id"]}') + storage_client.configurations.delete(component_id, config['id']) + # Double delete because the first delete moves the configuration to the trash + storage_client.configurations.delete(component_id, config['id']) configs = _create_configs(storage_client) @@ -353,3 +376,44 @@ def mcp_server(storage_api_url: str, storage_api_token: str, workspace_schema: s async def mcp_client(mcp_server: FastMCP) -> AsyncGenerator[Client, None]: async with Client(mcp_server) as client: yield client + + +@pytest.fixture(scope='session') +def hostname_suffix(storage_api_url: str) -> str: + """Extract hostname suffix from storage API URL (e.g., 'europe-west3.gcp.keboola.com')""" + from urllib.parse import urlparse + + parsed = urlparse(storage_api_url) + if not parsed.hostname or not parsed.hostname.startswith('connection.'): + raise ValueError(f'Invalid storage API URL: {storage_api_url}') + return parsed.hostname.split('connection.')[1] + + +@pytest.fixture(scope='session') +def connection_url(hostname_suffix: str) -> str: + """Base connection URL for the region (e.g., 'https://connection.europe-west3.gcp.keboola.com')""" + return f'https://connection.{hostname_suffix}' + + +@pytest.fixture(scope='session') +def queue_url(hostname_suffix: str) -> str: + """Queue API URL for the region (e.g., 'https://queue.europe-west3.gcp.keboola.com')""" + return f'https://queue.{hostname_suffix}' + + +@pytest.fixture(scope='session') +def ai_url(hostname_suffix: str) -> str: + """AI service URL for the region (e.g., 'https://ai.europe-west3.gcp.keboola.com')""" + return f'https://ai.{hostname_suffix}' + + +@pytest.fixture +def links_manager(keboola_client: KeboolaClient, keboola_project: ProjectDef): + """ProjectLinksManager instance for generating region-aware UI links""" + from keboola_mcp_server.links import ProjectLinksManager + + return ProjectLinksManager( + base_url=keboola_client.storage_api_url, + project_id=str(keboola_project.project_id), + branch_id=keboola_client.branch_id, + ) diff --git a/integtests/test_errors.py b/integtests/test_errors.py index b22fa19c..0b7d57be 100644 --- a/integtests/test_errors.py +++ b/integtests/test_errors.py @@ -29,10 +29,10 @@ async def test_storage_api_404_error_maintains_standard_behavior(self, mcp_conte assert 'non.existent.bucket' in result.buckets_not_found @pytest.mark.asyncio - async def test_jobs_api_404_error_(self, mcp_context: Context): + async def test_jobs_api_404_error_(self, mcp_context: Context, queue_url: str): match = re.compile( r"Client error '404 Not Found' " - r"for url 'https://queue.keboola.com/jobs/999999999'\n" + rf"for url '{queue_url}/jobs/999999999'\n" r'For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/404\n' r'API error: Job "999999999" not found\n' r'Exception ID: .+\n' @@ -49,11 +49,11 @@ async def test_jobs_api_404_error_(self, mcp_context: Context): assert match.search(str(err.exceptions[0])) is not None @pytest.mark.asyncio - async def test_docs_api_empty_query_error(self, mcp_context: Context): + async def test_docs_api_empty_query_error(self, mcp_context: Context, ai_url: str): """Test that docs_query raises 422 error for empty queries.""" match = re.compile( r"Client error '422 Unprocessable Content' " - r"for url 'https://ai.keboola.com/docs/question'\n" + rf"for url '{ai_url}/docs/question'\n" r'For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422\n' r'API error: Request contents is not valid\n' r'Exception ID: .+\n' diff --git a/integtests/tools/components/test_tools.py b/integtests/tools/components/test_tools.py index b399a9b8..5519c741 100644 --- a/integtests/tools/components/test_tools.py +++ b/integtests/tools/components/test_tools.py @@ -156,7 +156,9 @@ async def test_get_configs_list_by_types( @pytest.mark.asyncio -async def test_create_config(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): +async def test_create_config( + mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef, links_manager +): """Tests that `create_config` creates a configuration with correct metadata.""" # Use the first component from configs for testing @@ -171,8 +173,6 @@ async def test_create_config(mcp_context: Context, configs: list[ConfigDef], keb client = KeboolaClient.from_state(mcp_context.session.state) - project_id = keboola_project.project_id - # Create the configuration created_config = await create_config( ctx=mcp_context, @@ -193,19 +193,12 @@ async def test_create_config(mcp_context: Context, configs: list[ConfigDef], keb assert created_config.version is not None assert frozenset(created_config.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Configuration: {test_name}', - url=( - f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}/' - + f'{created_config.configuration_id}' - ), - ), - Link( - type='ui-dashboard', - title=f'Component "{component_id}" Configurations Dashboard', - url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', + links_manager.get_component_config_link( + component_id=component_id, + configuration_id=created_config.configuration_id, + configuration_name=test_name, ), + links_manager.get_config_dashboard_link(component_id=component_id, component_name=None), ] ) @@ -248,26 +241,46 @@ async def test_create_config(mcp_context: Context, configs: list[ConfigDef], keb async def initial_cmpconf( mcp_client: Client, configs: list[ConfigDef], keboola_client: KeboolaClient ) -> AsyncGenerator[ConfigToolOutput, None]: - # Create the initial component configuration test data - tool_result = await mcp_client.call_tool( - name='create_config', - arguments={ - 'name': 'Initial Test Configuration', - 'description': 'Initial test configuration created by automated test', - 'component_id': configs[0].component_id, - 'parameters': {'initial_param': 'initial_value'}, - 'storage': {'input': {'tables': [{'source': 'in.c-bucket.table', 'destination': 'input.csv'}]}}, - }, - ) + configuration_id: str | None = None + component_id: str = configs[0].component_id + try: - yield ConfigToolOutput.model_validate(tool_result.structured_content) - finally: - # Clean up: Delete the configuration - await keboola_client.storage_client.configuration_delete( - component_id=configs[0].component_id, - configuration_id=tool_result.structured_content['configuration_id'], - skip_trash=True, + LOG.debug(f'Creating initial test configuration for component: {component_id}') + tool_result = await mcp_client.call_tool( + name='create_config', + arguments={ + 'name': 'Initial Test Configuration', + 'description': 'Initial test configuration created by automated test', + 'component_id': component_id, + 'parameters': {'initial_param': 'initial_value'}, + 'storage': {'input': {'tables': [{'source': 'in.c-bucket.table', 'destination': 'input.csv'}]}}, + }, ) + config_output = ConfigToolOutput.model_validate(tool_result.structured_content) + configuration_id = config_output.configuration_id + yield config_output + + except Exception: + # If tool creation fails but returned a configuration_id, try to extract it + if 'tool_result' in locals() and hasattr(tool_result, 'structured_content'): + try: + configuration_id = tool_result.structured_content.get('configuration_id') + except Exception: + pass + raise + + finally: + # Clean up if we have a configuration_id + if configuration_id: + try: + LOG.debug(f'Cleaning up component configuration: {configuration_id}') + await keboola_client.storage_client.configuration_delete( + component_id=component_id, + configuration_id=configuration_id, + skip_trash=True, + ) + except Exception as cleanup_error: + LOG.error(f'Failed to clean up component configuration {configuration_id}: {cleanup_error}') @pytest.mark.asyncio @@ -293,9 +306,9 @@ async def test_update_config( mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, + links_manager, ): """Tests that `update_config` updates a configuration with correct metadata.""" - project_id = keboola_project.project_id component_id = initial_cmpconf.component_id configuration_id = initial_cmpconf.configuration_id param_update_dicts = updates.get('parameter_updates') @@ -337,17 +350,12 @@ async def test_update_config( assert update_result.description == expected_description assert frozenset(update_result.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Configuration: {expected_name}', - url='https://connection.keboola.com/admin' - f'/projects/{project_id}/components/{component_id}/{configuration_id}', - ), - Link( - type='ui-dashboard', - title=f'Component "{component_id}" Configurations Dashboard', - url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', + links_manager.get_component_config_link( + component_id=component_id, + configuration_id=configuration_id, + configuration_name=expected_name, ), + links_manager.get_config_dashboard_link(component_id=component_id, component_name=None), ] ) @@ -391,7 +399,9 @@ async def test_update_config( @pytest.mark.asyncio -async def test_add_config_row(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): +async def test_add_config_row( + mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef, links_manager +): """Tests that `add_config_row` creates a row configuration with correct metadata.""" # Use the first component from configs for testing @@ -412,8 +422,6 @@ async def test_add_config_row(mcp_context: Context, configs: list[ConfigDef], ke client = KeboolaClient.from_state(mcp_context.session.state) - project_id = keboola_project.project_id - # First create a root configuration to add row to root_config = await create_config( ctx=mcp_context, @@ -446,19 +454,12 @@ async def test_add_config_row(mcp_context: Context, configs: list[ConfigDef], ke assert created_row_config.version is not None assert frozenset(created_row_config.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Configuration: {row_name}', - url=( - f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}/' - + f'{root_config.configuration_id}' - ), - ), - Link( - type='ui-dashboard', - title=f'Component "{component_id}" Configurations Dashboard', - url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', + links_manager.get_component_config_link( + component_id=component_id, + configuration_id=root_config.configuration_id, + configuration_name=row_name, ), + links_manager.get_config_dashboard_link(component_id=component_id, component_name=None), ] ) @@ -553,9 +554,9 @@ async def test_update_config_row( mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, + links_manager, ): """Tests that `update_config_row` updates a row configuration with correct metadata.""" - project_id = keboola_project.project_id component_id = initial_cmpconf_row.component_id configuration_id = initial_cmpconf_row.configuration_id @@ -593,17 +594,12 @@ async def test_update_config_row( assert updated_row_config.description == expected_row_description assert frozenset(updated_row_config.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Configuration: {expected_row_name}', - url='https://connection.keboola.com/admin' - f'/projects/{project_id}/components/{component_id}/{configuration_id}', - ), - Link( - type='ui-dashboard', - title=f'Component "{component_id}" Configurations Dashboard', - url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', + links_manager.get_component_config_link( + component_id=component_id, + configuration_id=configuration_id, + configuration_name=expected_row_name, ), + links_manager.get_config_dashboard_link(component_id=component_id, component_name=None), ] ) @@ -653,7 +649,7 @@ async def test_update_config_row( @pytest.mark.asyncio -async def test_create_sql_transformation(mcp_context: Context, keboola_project: ProjectDef): +async def test_create_sql_transformation(mcp_context: Context, keboola_project: ProjectDef, links_manager): """Tests that `create_sql_transformation` creates a SQL transformation with correct configuration.""" test_name = 'Test SQL Transformation' @@ -680,7 +676,6 @@ async def test_create_sql_transformation(mcp_context: Context, keboola_project: ) sql_dialect = await WorkspaceManager.from_state(mcp_context.session.state).get_sql_dialect() expected_component_id = get_sql_transformation_id_from_sql_dialect(sql_dialect) - project_id = keboola_project.project_id try: # Verify the response structure @@ -693,19 +688,12 @@ async def test_create_sql_transformation(mcp_context: Context, keboola_project: assert created_transformation.version is not None expected_links = frozenset( [ - Link( - type='ui-detail', - title=f'Transformation: {test_name}', - url=( - f'https://connection.keboola.com/admin/projects/{project_id}/transformations-v2/' - f'{expected_component_id}/{created_transformation.configuration_id}' - ), - ), - Link( - type='ui-dashboard', - title='Transformations dashboard', - url=(f'https://connection.keboola.com/admin/projects/{project_id}/transformations-v2'), + links_manager.get_transformation_config_link( + transformation_type=expected_component_id, + transformation_id=created_transformation.configuration_id, + transformation_name=test_name, ), + links_manager.get_transformations_dashboard_link(), ] ) @@ -874,9 +862,9 @@ async def test_update_sql_transformation( mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, + links_manager, ): """Tests that `update_sql_transformation` updates an existing SQL transformation correctly.""" - project_id = keboola_project.project_id component_id = initial_sqltrfm.component_id configuration_id = initial_sqltrfm.configuration_id param_update_objects = updates.get('parameter_updates') @@ -933,17 +921,12 @@ async def test_update_sql_transformation( assert updated_trfm.description == expected_description assert frozenset(updated_trfm.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Transformation: {expected_name}', - url='https://connection.keboola.com/admin' - f'/projects/{project_id}/transformations-v2/{component_id}/{configuration_id}', - ), - Link( - type='ui-dashboard', - title='Transformations dashboard', - url=f'https://connection.keboola.com/admin/projects/{project_id}/transformations-v2', + links_manager.get_transformation_config_link( + transformation_type=component_id, + transformation_id=configuration_id, + transformation_name=expected_name, ), + links_manager.get_transformations_dashboard_link(), ] ) diff --git a/integtests/tools/flow/conftest.py b/integtests/tools/flow/conftest.py index af9c2aab..073a640a 100644 --- a/integtests/tools/flow/conftest.py +++ b/integtests/tools/flow/conftest.py @@ -1,11 +1,12 @@ import logging -from typing import AsyncGenerator +from typing import Any, AsyncGenerator, Generator from unittest.mock import AsyncMock import pytest import pytest_asyncio from fastmcp import Client, FastMCP from fastmcp.server.middleware import CallNext, MiddlewareContext +from kbcstorage.client import Client as SyncStorageClient from mcp import types as mt from integtests.conftest import ConfigDef @@ -21,6 +22,51 @@ LOG = logging.getLogger(__name__) +@pytest.fixture(scope='session') +def _ensure_clean_flows(storage_api_token: str, storage_api_url: str) -> Generator[None, Any, None]: + """ + Ensure the project has no flows before and after flow tests. + This prevents leftover flows from failed tests affecting new test runs. + """ + client = SyncStorageClient(storage_api_url, storage_api_token) + + # Check for and clean up any leftover flows before tests + orchestrator_configs = client.configurations.list(component_id=ORCHESTRATOR_COMPONENT_ID) + if orchestrator_configs: + LOG.warning(f'Found {len(orchestrator_configs)} leftover orchestrator flows. Cleaning up...') + for config in orchestrator_configs: + LOG.info(f'Deleting leftover orchestrator flow: {config["id"]}') + # Call delete twice for permanent deletion (first to trash, second to remove from trash) + client.configurations.delete(ORCHESTRATOR_COMPONENT_ID, config['id']) + client.configurations.delete(ORCHESTRATOR_COMPONENT_ID, config['id']) + + conditional_configs = client.configurations.list(component_id=CONDITIONAL_FLOW_COMPONENT_ID) + if conditional_configs: + LOG.warning(f'Found {len(conditional_configs)} leftover conditional flows. Cleaning up...') + for config in conditional_configs: + LOG.info(f'Deleting leftover conditional flow: {config["id"]}') + # Call delete twice for permanent deletion (first to trash, second to remove from trash) + client.configurations.delete(CONDITIONAL_FLOW_COMPONENT_ID, config['id']) + client.configurations.delete(CONDITIONAL_FLOW_COMPONENT_ID, config['id']) + + yield + + # Clean up after all tests complete + orchestrator_configs = client.configurations.list(component_id=ORCHESTRATOR_COMPONENT_ID) + for config in orchestrator_configs: + LOG.info(f'Cleaning up orchestrator flow: {config["id"]}') + # Call delete twice for permanent deletion (first to trash, second to remove from trash) + client.configurations.delete(ORCHESTRATOR_COMPONENT_ID, config['id']) + client.configurations.delete(ORCHESTRATOR_COMPONENT_ID, config['id']) + + conditional_configs = client.configurations.list(component_id=CONDITIONAL_FLOW_COMPONENT_ID) + for config in conditional_configs: + LOG.info(f'Cleaning up conditional flow: {config["id"]}') + # Call delete twice for permanent deletion (first to trash, second to remove from trash) + client.configurations.delete(CONDITIONAL_FLOW_COMPONENT_ID, config['id']) + client.configurations.delete(CONDITIONAL_FLOW_COMPONENT_ID, config['id']) + + @pytest.fixture def mcp_server(storage_api_url: str, storage_api_token: str, workspace_schema: str, mocker) -> FastMCP: # allow all tool calls regardless the testing project features @@ -46,81 +92,125 @@ async def mcp_client(mcp_server: FastMCP) -> AsyncGenerator[Client, None]: @pytest_asyncio.fixture async def initial_lf( - mcp_client: Client, configs: list[ConfigDef], keboola_client: KeboolaClient + mcp_client: Client, + configs: list[ConfigDef], + keboola_client: KeboolaClient, + _ensure_clean_flows: None, ) -> AsyncGenerator[FlowToolOutput, None]: - # Create the initial component configuration test data - tool_result = await mcp_client.call_tool( - name='create_flow', - arguments={ - 'name': 'Initial Test Flow', - 'description': 'Initial test flow created by automated test', - 'phases': [{'name': 'Phase1', 'dependsOn': [], 'description': 'First phase'}], - 'tasks': [ - { - 'id': 20001, - 'name': 'Task1', - 'phase': 1, - 'continueOnFailure': False, - 'enabled': False, - 'task': { - 'componentId': configs[0].component_id, - 'configId': configs[0].configuration_id, - 'mode': 'run', - }, - } - ], - }, - ) + configuration_id: str | None = None + try: - yield FlowToolOutput.model_validate(tool_result.structured_content) - finally: - # Clean up: Delete the configuration - await keboola_client.storage_client.configuration_delete( - component_id=ORCHESTRATOR_COMPONENT_ID, - configuration_id=tool_result.structured_content['configuration_id'], - skip_trash=True, + LOG.debug('Creating initial test flow (orchestrator)') + tool_result = await mcp_client.call_tool( + name='create_flow', + arguments={ + 'name': 'Initial Test Flow', + 'description': 'Initial test flow created by automated test', + 'phases': [{'name': 'Phase1', 'dependsOn': [], 'description': 'First phase'}], + 'tasks': [ + { + 'id': 20001, + 'name': 'Task1', + 'phase': 1, + 'continueOnFailure': False, + 'enabled': False, + 'task': { + 'componentId': configs[0].component_id, + 'configId': configs[0].configuration_id, + 'mode': 'run', + }, + } + ], + }, ) + flow_output = FlowToolOutput.model_validate(tool_result.structured_content) + configuration_id = flow_output.configuration_id + yield flow_output + + except Exception: + # If tool creation fails but returned a configuration_id, try to extract it + if 'tool_result' in locals() and hasattr(tool_result, 'structured_content'): + try: + configuration_id = tool_result.structured_content.get('configuration_id') + except Exception: + pass + raise + + finally: + # Clean up if we have a configuration_id + if configuration_id: + try: + LOG.debug(f'Cleaning up flow configuration: {configuration_id}') + await keboola_client.storage_client.configuration_delete( + component_id=ORCHESTRATOR_COMPONENT_ID, + configuration_id=configuration_id, + skip_trash=True, + ) + except Exception as cleanup_error: + LOG.error(f'Failed to clean up flow configuration {configuration_id}: {cleanup_error}') @pytest_asyncio.fixture async def initial_cf( - mcp_client: Client, configs: list[ConfigDef], keboola_client: KeboolaClient + mcp_client: Client, + configs: list[ConfigDef], + keboola_client: KeboolaClient, + _ensure_clean_flows: None, ) -> AsyncGenerator[FlowToolOutput, None]: - # Create the initial component configuration test data - tool_result = await mcp_client.call_tool( - name='create_conditional_flow', - arguments={ - 'name': 'Initial Test Flow', - 'description': 'Initial test flow created by automated test', - 'phases': [ - { - 'id': 'phase1', - 'name': 'Phase1', - 'description': 'First phase', - 'next': [{'id': 'phase1_end', 'name': 'End Flow', 'goto': None}], - }, - ], - 'tasks': [ - { - 'id': 'task1', - 'name': 'Task1', - 'phase': 'phase1', - 'task': { - 'type': 'job', - 'componentId': configs[0].component_id, - 'configId': configs[0].configuration_id, - 'mode': 'run', - }, - }, - ], - }, - ) + configuration_id: str | None = None + try: - yield FlowToolOutput.model_validate(tool_result.structured_content) - finally: - # Clean up: Delete the configuration - await keboola_client.storage_client.configuration_delete( - component_id=CONDITIONAL_FLOW_COMPONENT_ID, - configuration_id=tool_result.structured_content['configuration_id'], - skip_trash=True, + LOG.debug('Creating initial test flow (conditional)') + tool_result = await mcp_client.call_tool( + name='create_conditional_flow', + arguments={ + 'name': 'Initial Test Flow', + 'description': 'Initial test flow created by automated test', + 'phases': [ + { + 'id': 'phase1', + 'name': 'Phase1', + 'description': 'First phase', + 'next': [{'id': 'phase1_end', 'name': 'End Flow', 'goto': None}], + }, + ], + 'tasks': [ + { + 'id': 'task1', + 'name': 'Task1', + 'phase': 'phase1', + 'task': { + 'type': 'job', + 'componentId': configs[0].component_id, + 'configId': configs[0].configuration_id, + 'mode': 'run', + }, + }, + ], + }, ) + flow_output = FlowToolOutput.model_validate(tool_result.structured_content) + configuration_id = flow_output.configuration_id + yield flow_output + + except Exception: + # If tool creation fails but returned a configuration_id, try to extract it + if 'tool_result' in locals() and hasattr(tool_result, 'structured_content'): + try: + configuration_id = tool_result.structured_content.get('configuration_id') + except Exception: + pass + raise + + finally: + # Clean up if we have a configuration_id + if configuration_id: + try: + LOG.debug(f'Cleaning up conditional flow configuration: {configuration_id}') + await keboola_client.storage_client.configuration_delete( + component_id=CONDITIONAL_FLOW_COMPONENT_ID, + configuration_id=configuration_id, + skip_trash=True, + ) + except Exception as cleanup_error: + LOG.error(f'Failed to clean up conditional flow configuration {configuration_id}: {cleanup_error}') diff --git a/integtests/tools/flow/test_tools.py b/integtests/tools/flow/test_tools.py index 7ce2b34d..dcac07ca 100644 --- a/integtests/tools/flow/test_tools.py +++ b/integtests/tools/flow/test_tools.py @@ -18,7 +18,7 @@ ) from keboola_mcp_server.config import MetadataField from keboola_mcp_server.errors import ToolError -from keboola_mcp_server.links import Link, ProjectLinksManager +from keboola_mcp_server.links import ProjectLinksManager from keboola_mcp_server.tools.constants import MODIFY_FLOW_TOOL_NAME, UPDATE_FLOW_TOOL_NAME from keboola_mcp_server.tools.flow.model import ConditionalFlowPhase, Flow, GetFlowsDetailOutput, GetFlowsListOutput from keboola_mcp_server.tools.flow.tools import ( @@ -409,6 +409,7 @@ async def test_update_flow( mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, + links_manager, ) -> None: """Tests that 'update_flow' tool works as expected.""" flow_id = initial_lf.configuration_id if flow_type == ORCHESTRATOR_COMPONENT_ID else initial_cf.configuration_id @@ -425,7 +426,6 @@ async def test_update_flow( else: tool_name = UPDATE_FLOW_TOOL_NAME - project_id = keboola_project.project_id tool_result = await mcp_client.call_tool( name=tool_name, arguments={ @@ -447,25 +447,11 @@ async def test_update_flow( expected_name = updates.get('name') or 'Initial Test Flow' expected_description = updates.get('description') or initial_flow.description assert updated_flow.description == expected_description - if flow_type == ORCHESTRATOR_COMPONENT_ID: - flow_path = 'flows' - flow_label = 'Flows' - else: - flow_path = 'flows-v2' - flow_label = 'Conditional Flows' assert frozenset(updated_flow.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Flow: {expected_name}', - url=f'https://connection.keboola.com/admin/projects/{project_id}/{flow_path}/{flow_id}', - ), - Link( - type='ui-dashboard', - title=f'{flow_label} in the project', - url=f'https://connection.keboola.com/admin/projects/{project_id}/{flow_path}', - ), - Link(type='docs', title='Documentation for Keboola Flows', url='https://help.keboola.com/flows/'), + links_manager.get_flow_detail_link(flow_id, expected_name, flow_type), + links_manager.get_flows_dashboard_link(flow_type), + links_manager.get_flows_docs_link(), ] ) @@ -549,7 +535,11 @@ async def test_get_flows_empty(mcp_context: Context) -> None: @pytest.mark.asyncio async def test_get_flows_list( - keboola_project: ProjectDef, mcp_client: Client, initial_lf: FlowToolOutput, initial_cf: FlowToolOutput + keboola_project: ProjectDef, + mcp_client: Client, + initial_lf: FlowToolOutput, + initial_cf: FlowToolOutput, + links_manager, ) -> None: """Tests that `get_flows` tool works as expected when listing all flows.""" tool_call_result = await mcp_client.call_tool(name='get_flows', arguments={}) @@ -558,16 +548,8 @@ async def test_get_flows_list( assert len(flows.flows) == 2 assert frozenset(flows.links) == frozenset( [ - Link( - type='ui-dashboard', - title='Flows in the project', - url=f'https://connection.keboola.com/admin/projects/{keboola_project.project_id}/flows', - ), - Link( - type='ui-dashboard', - title='Conditional Flows in the project', - url=f'https://connection.keboola.com/admin/projects/{keboola_project.project_id}/flows-v2', - ), + links_manager.get_flows_dashboard_link(ORCHESTRATOR_COMPONENT_ID), + links_manager.get_flows_dashboard_link(CONDITIONAL_FLOW_COMPONENT_ID), ] ) assert flows.flows[0].configuration_id == initial_cf.configuration_id diff --git a/integtests/tools/test_jobs.py b/integtests/tools/test_jobs.py index 26a378bc..37ae8b41 100644 --- a/integtests/tools/test_jobs.py +++ b/integtests/tools/test_jobs.py @@ -6,7 +6,6 @@ from integtests.conftest import ConfigDef, ProjectDef from keboola_mcp_server.clients.client import KeboolaClient -from keboola_mcp_server.links import Link from keboola_mcp_server.tools.components import create_config from keboola_mcp_server.tools.jobs import ( GetJobsDetailOutput, @@ -95,11 +94,11 @@ async def test_get_jobs_listing_with_component_and_config_filter(mcp_context: Co @pytest.mark.asyncio -async def test_run_job_and_get_jobs(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): +async def test_run_job_and_get_jobs( + mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef, links_manager +): """Tests that `run_job` creates a job and `get_jobs` retrieves its details.""" - project_id = keboola_project.project_id - test_config = configs[0] component_id = test_config.component_id configuration_id = test_config.configuration_id @@ -114,16 +113,8 @@ async def test_run_job_and_get_jobs(mcp_context: Context, configs: list[ConfigDe assert started_job.status is not None assert frozenset(started_job.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Job: {started_job.id}', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{started_job.id}', - ), - Link( - type='ui-dashboard', - title='Jobs in the project', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', - ), + links_manager.get_job_detail_link(started_job.id), + links_manager.get_jobs_dashboard_link(), ] ) @@ -141,26 +132,18 @@ async def test_run_job_and_get_jobs(mcp_context: Context, configs: list[ConfigDe assert job_detail.url is not None assert frozenset(job_detail.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Job: {job_detail.id}', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{job_detail.id}', - ), - Link( - type='ui-dashboard', - title='Jobs in the project', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', - ), + links_manager.get_job_detail_link(job_detail.id), + links_manager.get_jobs_dashboard_link(), ] ) @pytest.mark.asyncio -async def test_get_jobs_detail(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): +async def test_get_jobs_detail( + mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef, links_manager +): """Tests `get_jobs` by creating a job and then retrieving its details.""" - project_id = keboola_project.project_id - # Use first config to create a specific job test_config = configs[0] component_id = test_config.component_id @@ -182,28 +165,18 @@ async def test_get_jobs_detail(mcp_context: Context, configs: list[ConfigDef], k assert job_detail.status is not None assert frozenset(job_detail.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Job: {created_job.id}', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{created_job.id}', - ), - Link( - type='ui-dashboard', - title='Jobs in the project', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', - ), + links_manager.get_job_detail_link(created_job.id), + links_manager.get_jobs_dashboard_link(), ] ) @pytest.mark.asyncio async def test_run_job_with_newly_created_config( - mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef + mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef, links_manager ): """Tests that `run_job` works with a newly created configuration.""" - project_id = keboola_project.project_id - test_config = configs[0] component_id = test_config.component_id @@ -231,16 +204,8 @@ async def test_run_job_with_newly_created_config( assert started_job.status is not None assert frozenset(started_job.links) == frozenset( [ - Link( - type='ui-detail', - title=f'Job: {started_job.id}', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{started_job.id}', - ), - Link( - type='ui-dashboard', - title='Jobs in the project', - url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', - ), + links_manager.get_job_detail_link(started_job.id), + links_manager.get_jobs_dashboard_link(), ] ) diff --git a/pyproject.toml b/pyproject.toml index 9cc48065..5cc14201 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "keboola-mcp-server" -version = "1.43.2" +version = "1.44.1" description = "MCP server for interacting with Keboola Connection" readme = "README.md" requires-python = ">=3.10" diff --git a/src/keboola_mcp_server/authorization.py b/src/keboola_mcp_server/authorization.py index 7da923a4..ce8ca185 100644 --- a/src/keboola_mcp_server/authorization.py +++ b/src/keboola_mcp_server/authorization.py @@ -23,6 +23,7 @@ from mcp import types as mt from keboola_mcp_server.mcp import get_http_request_or_none +from keboola_mcp_server.utils import is_read_only_tool LOG = logging.getLogger(__name__) @@ -81,13 +82,6 @@ def _get_authorization_config() -> tuple[set[str] | None, set[str] | None, bool] return allowed_tools, disallowed_tools, read_only_mode - @staticmethod - def _is_read_only_tool(tool: Tool) -> bool: - """Check if a tool has readOnlyHint=True annotation.""" - if tool.annotations is None: - return False - return tool.annotations.readOnlyHint is True - @staticmethod def _is_tool_authorized( tool: Tool, allowed_tools: set[str] | None, disallowed_tools: set[str] | None, read_only_mode: bool @@ -97,7 +91,7 @@ def _is_tool_authorized( if disallowed_tools and tool.name in disallowed_tools: return False # Check read-only mode - only allow tools with readOnlyHint=True - if read_only_mode and not ToolAuthorizationMiddleware._is_read_only_tool(tool): + if read_only_mode and not is_read_only_tool(tool): return False # Then check if tool is in allowed list (if specified) if allowed_tools is not None and tool.name not in allowed_tools: diff --git a/src/keboola_mcp_server/errors.py b/src/keboola_mcp_server/errors.py index 1117be8d..c47bfdef 100644 --- a/src/keboola_mcp_server/errors.py +++ b/src/keboola_mcp_server/errors.py @@ -5,6 +5,7 @@ from functools import wraps from typing import Any, Callable, Mapping, Optional, Type, TypeVar, cast +import httpx import yaml from fastmcp import Context from fastmcp.exceptions import ToolError @@ -170,7 +171,14 @@ async def wrapped(*args, **kwargs): await _trigger_event(func, args, kwargs, exception, time.perf_counter() - start) except Exception as e: LOG.exception(f'Failed to trigger tool event for "{func.__name__}" tool: {e}') - raise + # Only swallow 403 Forbidden errors (expected for guest/read-only roles) + # Re-raise other errors as they indicate genuine problems + if isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 403: + # Expected for restricted roles (guest, read-only) - don't fail the tool call + pass + else: + # Unexpected error - re-raise to alert about the problem + raise return cast(F, wrapped) diff --git a/src/keboola_mcp_server/mcp.py b/src/keboola_mcp_server/mcp.py index 5e5b0255..c37275ad 100644 --- a/src/keboola_mcp_server/mcp.py +++ b/src/keboola_mcp_server/mcp.py @@ -33,6 +33,7 @@ from keboola_mcp_server.config import Config, ServerRuntimeInfo from keboola_mcp_server.oauth import ProxyAccessToken from keboola_mcp_server.tools.constants import MODIFY_FLOW_TOOL_NAME, UPDATE_FLOW_TOOL_NAME +from keboola_mcp_server.utils import is_read_only_tool from keboola_mcp_server.workspace import WorkspaceManager LOG = logging.getLogger(__name__) @@ -258,6 +259,12 @@ class ToolsFilteringMiddleware(fmw.Middleware): The middleware also intercepts the `on_call_tool()` call and raises an exception if a call is attempted to a tool that is not available in the current project. + + Role-based access control: + - Guest: Read-only access (only tools with readOnlyHint=True) + - Read: Read-only access (only tools with readOnlyHint=True) + - Other non-admin roles: Write tools available, with specific tools (e.g., `modify_flow`) explicitly restricted + - Admin: Broad access to tools, with specific write tools (e.g., `update_flow`) explicitly restricted """ @staticmethod @@ -315,6 +322,11 @@ async def on_list_tools( # Filter out data app tools when the client is not using the main/production branch tools = [t for t in tools if t.name not in {'modify_data_app', 'get_data_apps', 'deploy_data_app'}] + # Role-based filtering: read-only access for guest and read roles + if token_role in ['guest', 'readonly']: + tools = [t for t in tools if is_read_only_tool(t)] + LOG.debug(f'Read-only access: filtered to {len(tools)} read-only tools for role={token_role}') + return tools async def on_call_tool( @@ -327,6 +339,15 @@ async def on_call_tool( features = self.get_project_features(token_info) token_role = self.get_token_role(token_info).lower() + # Block non-read-only tools for guest and read-only roles + if token_role in ['guest', 'readonly']: + if not is_read_only_tool(tool): + raise ToolError( + f'Access denied: The tool "{tool.name}" requires write permissions. ' + f'Your current role ({token_role}) only allows read-only operations. ' + f'Contact your administrator to request write access.' + ) + if 'hide-conditional-flows' in features: if tool.name == 'create_conditional_flow': raise ToolError( diff --git a/src/keboola_mcp_server/utils.py b/src/keboola_mcp_server/utils.py new file mode 100644 index 00000000..80881d7e --- /dev/null +++ b/src/keboola_mcp_server/utils.py @@ -0,0 +1,21 @@ +"""Shared utility functions for the Keboola MCP Server. + +This module contains utility functions used across multiple modules to avoid code duplication. +""" + +from fastmcp.tools import Tool + + +def is_read_only_tool(tool: Tool) -> bool: + """Check if a tool has readOnlyHint=True annotation. + + This helper is used by both ToolsFilteringMiddleware and ToolAuthorizationMiddleware + to determine which tools are read-only. Tools with readOnlyHint=True are safe for + guest and read-only users to access. + + :param tool: The Tool to check + :return: True if the tool has readOnlyHint=True, False otherwise + """ + if tool.annotations is None: + return False + return tool.annotations.readOnlyHint is True diff --git a/tests/test_mcp.py b/tests/test_mcp.py index be90329a..10f215e6 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -438,3 +438,76 @@ async def call_next(_): else: result = await middleware.on_call_tool(context, call_next) assert result is expected + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ('token_role', 'tool_name', 'tool_read_only', 'should_allow'), + [ + # Guest role - allow read-only, block write + ('guest', 'get_buckets', True, True), + ('guest', 'create_config', False, False), + # Read role - allow read-only, block write + ('readOnly', 'get_flows', True, True), + ('readOnly', 'update_descriptions', False, False), + # Admin role - allow all + ('admin', 'get_buckets', True, True), + ('admin', 'create_config', False, True), + # Other role - allow all + ('developer', 'query_data', True, True), + ('developer', 'delete_bucket', False, True), + ], + ids=[ + 'guest_read_allowed', + 'guest_write_blocked', + 'readonly_read_allowed', + 'readonly_write_blocked', + 'admin_read_allowed', + 'admin_write_allowed', + 'developer_read_allowed', + 'developer_write_allowed', + ], + ) + async def test_on_call_tool_role_blocking( + self, + mcp_context_client, + token_role: str, + tool_name: str, + tool_read_only: bool, + should_allow: bool, + ) -> None: + """Test on_call_tool() blocks write operations for guest/read roles.""" + # Setup token info + token_info = {'owner': {'features': []}, 'admin': {'role': token_role}} + + keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) + keboola_client.storage_client.verify_token = AsyncMock(return_value=token_info) + keboola_client.branch_id = None # Use main branch to avoid data app filtering + + # Create mock tool with read-only annotation + tool = _tool(tool_name) + tool.annotations = MagicMock() + tool.annotations.readOnlyHint = tool_read_only + + mcp_context_client.fastmcp = SimpleNamespace(get_tool=AsyncMock(return_value=tool)) + context = SimpleNamespace(fastmcp_context=mcp_context_client, message=SimpleNamespace(name=tool_name)) + + expected = MagicMock() + + async def call_next(_): + return expected + + middleware = ToolsFilteringMiddleware() + + if should_allow: + # Should not raise an error + result = await middleware.on_call_tool(context, call_next) + assert result is expected + else: + # Should raise ToolError + with pytest.raises(ToolError) as exc_info: + await middleware.on_call_tool(context, call_next) + # Verify error message contains tool name and role info + error_msg = str(exc_info.value) + assert tool_name in error_msg + assert 'write permissions' in error_msg + assert token_role in error_msg diff --git a/tests/test_server.py b/tests/test_server.py index 949e55c4..893e65e4 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -223,6 +223,8 @@ async def assessed_function( ({'role': 'admin'}, 'modify_flow', 'update_flow'), ({'role': None}, 'update_flow', 'modify_flow'), ({}, 'update_flow', 'modify_flow'), + ({'role': 'guest'}, 'get_buckets', 'create_config'), + ({'role': 'readOnly'}, 'query_data', 'update_descriptions'), ], ) async def test_with_session_state_admin_role_tools(mocker, admin_info, expected_included, expected_excluded): @@ -457,14 +459,23 @@ async def read_stream(stream, lines_list): try: # give the server time to fully start - await asyncio.sleep(5) + await asyncio.sleep(8) # connect to the server and list prompts to force 'fastmcp' looger to get used # the listing of the prompts does not require SAPI connection - async with Client(SSETransport('http://localhost:8000/sse', sse_read_timeout=5)) as client: + async with Client(SSETransport('http://localhost:8000/sse', sse_read_timeout=10)) as client: prompts = await client.list_prompts() assert len(prompts) > 1 + # Also call list_resources to trigger more middleware logging from fastmcp + # This doesn't require SAPI connection either + resources = await client.list_resources() + assert len(resources) >= 0 # May be empty, that's OK + + # Call list_prompts again to generate more fastmcp logs + prompts2 = await client.list_prompts() + assert len(prompts2) > 1 + finally: # kill the server and wait for output tasks p.terminate() diff --git a/uv.lock b/uv.lock index 25322282..e6f6d739 100644 --- a/uv.lock +++ b/uv.lock @@ -1200,7 +1200,7 @@ wheels = [ [[package]] name = "keboola-mcp-server" -version = "1.43.2" +version = "1.44.1" source = { editable = "." } dependencies = [ { name = "cryptography" },