From 77912cc783d488346c677ef8314704445c94f33f Mon Sep 17 00:00:00 2001 From: Sun Ro Lee Date: Fri, 3 Oct 2025 00:05:29 +0900 Subject: [PATCH] Add alias and data stream to list tool Signed-off-by: Sun Ro Lee --- CHANGELOG.md | 1 + src/opensearch/helper.py | 98 +++++++++++++++++++++++++++++++++- src/tools/tools.py | 107 ++++++++++++++++++++++++++++++++------ tests/tools/test_tools.py | 103 +++++++++++++++++++++++++++++++++--- 4 files changed, 284 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d08fb8..bdad2f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Enhance tool filtering ([#101](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/101)) - Add core tools as a category ([#103](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/103)) - set stateless=True for streaming server by default ([#104](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/104)) +- Expand ListIndexTool to return aliases and data streams while hiding backing indices from general listings ([#108](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/108)) ### Fixed diff --git a/src/opensearch/helper.py b/src/opensearch/helper.py index 2687bee..f0c1804 100644 --- a/src/opensearch/helper.py +++ b/src/opensearch/helper.py @@ -3,7 +3,9 @@ import json import logging +from copy import deepcopy from semver import Version +from opensearchpy.exceptions import NotFoundError from tools.tool_params import * # Configure logging @@ -12,12 +14,104 @@ # List all the helper functions, these functions perform a single rest call to opensearch # these functions will be used in tools folder to eventually write more complex tools -def list_indices(args: ListIndicesArgs) -> json: +def list_indices(args: ListIndicesArgs) -> list[dict]: from .client import initialize_client client = initialize_client(args) response = client.cat.indices(format='json') - return response + if not isinstance(response, list): + logger.warning('Unexpected response type for cat.indices: %s', type(response)) + return [] + + sanitized: list[dict] = [] + for item in response: + if not isinstance(item, dict): + continue + index_name = item.get('index') + if not isinstance(index_name, str): + continue + sanitized.append(item) + + return sanitized + + +def list_aliases(args: ListIndicesArgs) -> dict[str, list[str]]: + from .client import initialize_client + + client = initialize_client(args) + + try: + response = client.indices.get_alias(index='*') + except NotFoundError: + return {} + + if not isinstance(response, dict): + logger.warning('Unexpected response type for indices.get_alias: %s', type(response)) + return {} + + alias_map: dict[str, set[str]] = {} + for index_name, alias_entry in response.items(): + if not isinstance(index_name, str) or not isinstance(alias_entry, dict): + continue + aliases = alias_entry.get('aliases', {}) + if not isinstance(aliases, dict): + continue + for alias_name in aliases.keys(): + if not isinstance(alias_name, str): + continue + alias_map.setdefault(alias_name, set()).add(index_name) + + return {alias: sorted(index_names) for alias, index_names in alias_map.items()} + + +def list_data_streams(args: ListIndicesArgs) -> list[dict]: + from .client import initialize_client + + client = initialize_client(args) + + try: + response = client.indices.get_data_stream(name='*') + except NotFoundError: + return [] + + if not isinstance(response, dict): + logger.warning('Unexpected response type for indices.get_data_stream: %s', type(response)) + return [] + + data_streams = response.get('data_streams', []) + if not isinstance(data_streams, list): + logger.warning('Unexpected data_streams payload type: %s', type(data_streams)) + return [] + + sanitized: list[dict] = [] + for stream in data_streams: + if not isinstance(stream, dict): + continue + name = stream.get('name') + if not isinstance(name, str): + continue + + sanitized_stream = {key: deepcopy(value) for key, value in stream.items() if key != 'indices'} + + sanitized_indices: list[dict] = [] + indices_payload = stream.get('indices', []) + if isinstance(indices_payload, list): + for entry in indices_payload: + if not isinstance(entry, dict): + continue + index_name = entry.get('index_name') + if not isinstance(index_name, str): + continue + sanitized_entry = {'index_name': index_name} + index_uuid = entry.get('index_uuid') + if isinstance(index_uuid, str): + sanitized_entry['index_uuid'] = index_uuid + sanitized_indices.append(sanitized_entry) + + sanitized_stream['indices'] = sanitized_indices + sanitized.append(sanitized_stream) + + return sanitized def get_index(args: ListIndicesArgs) -> json: diff --git a/src/tools/tools.py b/src/tools/tools.py index 9b71327..edcc0a4 100644 --- a/src/tools/tools.py +++ b/src/tools/tools.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 import json +from typing import Any + from .tool_params import ( GetAllocationArgs, GetClusterStateArgs, @@ -35,6 +37,8 @@ get_query_insights, get_segments, get_shards, + list_aliases, + list_data_streams, list_indices, search_index, ) @@ -76,26 +80,99 @@ async def list_indices_tool(args: ListIndicesArgs) -> list[dict]: {'type': 'text', 'text': f'Index information for {args.index}:\n{formatted_info}'} ] - # Otherwise, list all indices + # Otherwise, list all indices, aliases, and data streams indices = list_indices(args) - - # If include_detail is False, return only pure list of index names - if not args.include_detail: - index_names = [ - item.get('index') - for item in indices - if isinstance(item, dict) and 'index' in item - ] - formatted_names = json.dumps(index_names, indent=2) - return [{'type': 'text', 'text': f'Indices:\n{formatted_names}'}] - - # include_detail is True: return full information - formatted_indices = json.dumps(indices, indent=2) - return [{'type': 'text', 'text': f'All indices information:\n{formatted_indices}'}] + aliases = list_aliases(args) + data_streams = list_data_streams(args) + + indices_payload = _format_indices(indices, aliases, data_streams, args.include_detail) + aliases_payload = _format_aliases(aliases, args.include_detail) + data_streams_payload = _format_data_streams(data_streams, args.include_detail) + + combined_payload = { + 'indices': indices_payload, + 'aliases': aliases_payload, + 'data_streams': data_streams_payload, + } + + formatted_payload = json.dumps(combined_payload, indent=2) + response_text = 'Indices, aliases, and data streams information:\n' + formatted_payload + return [{'type': 'text', 'text': response_text}] except Exception as e: return [{'type': 'text', 'text': f'Error listing indices: {str(e)}'}] +def _format_indices( + indices: list[dict], + aliases: dict[str, list[str]], + data_streams: list[dict], + include_detail: bool, +) -> list[Any]: + excluded_indices: set[str] = set() + + for index_list in aliases.values(): + excluded_indices.update(index_list) + + for stream in data_streams: + indices_payload = stream.get('indices', []) + if isinstance(indices_payload, list): + for entry in indices_payload: + if not isinstance(entry, dict): + continue + index_name = entry.get('index_name') + if isinstance(index_name, str): + excluded_indices.add(index_name) + + filtered = [ + item for item in indices if item.get('index') not in excluded_indices + ] + if include_detail: + return filtered + return [item['index'] for item in filtered if 'index' in item] + + +def _format_aliases( + aliases: dict[str, list[str]], include_detail: bool +) -> list[dict[str, Any]]: + sorted_alias_items = sorted(aliases.items()) + + if include_detail: + payload = [ + {'alias': alias_name, 'indices': index_list} + for alias_name, index_list in sorted_alias_items + ] + else: + payload = [ + {'alias': alias_name, 'index_count': len(index_list)} + for alias_name, index_list in sorted_alias_items + ] + + return payload + + +def _format_data_streams( + data_streams: list[dict], include_detail: bool +) -> list[dict[str, Any]]: + if include_detail: + return data_streams + + payload = [] + for stream in data_streams: + simplified = {k: v for k, v in stream.items() if k != 'indices'} + indices_payload = stream.get('indices', []) + index_count = 0 + if isinstance(indices_payload, list): + index_count = sum( + 1 + for entry in indices_payload + if isinstance(entry, dict) and isinstance(entry.get('index_name'), str) + ) + simplified['index_count'] = index_count + payload.append(simplified) + + return payload + + async def get_index_mapping_tool(args: GetIndexMappingArgs) -> list[dict]: try: check_tool_compatibility('IndexMappingTool', args) diff --git a/tests/tools/test_tools.py b/tests/tools/test_tools.py index 621e9b2..8c7719b 100644 --- a/tests/tools/test_tools.py +++ b/tests/tools/test_tools.py @@ -24,6 +24,8 @@ def setup_method(self): self.mock_client.cat.allocation.return_value = [] self.mock_client.cluster.state.return_value = {} self.mock_client.indices.stats.return_value = {} + self.mock_client.indices.get_alias.return_value = {} + self.mock_client.indices.get_data_stream.return_value = {'data_streams': []} self.mock_client.transport.perform_request.return_value = {} self.mock_client.info.return_value = {'version': {'number': '2.19.0'}} @@ -139,18 +141,56 @@ async def test_list_indices_tool_default_full(self): 'store.size': '2mb', 'pri.store.size': '1mb', }, + { + 'health': 'green', + 'status': 'open', + 'index': '.ds-logs-ds-000001', + 'uuid': 'uuid_ds', + 'pri': '1', + 'rep': '1', + 'docs.count': '50', + 'docs.deleted': '0', + 'store.size': '500kb', + 'pri.store.size': '250kb', + }, ] + self.mock_client.indices.get_alias.return_value = { + 'index1': {'aliases': {'alias_common': {}}}, + 'index2': {'aliases': {'alias_common': {}, 'alias_unique': {}}}, + } + self.mock_client.indices.get_data_stream.return_value = { + 'data_streams': [ + { + 'name': 'logs-ds', + 'timestamp_field': {'name': '@timestamp'}, + 'indices': [ + {'index_name': '.ds-logs-ds-000001', 'index_uuid': 'uuid-1'} + ], + 'generation': 1, + 'status': 'GREEN', + 'template': 'logs-template', + } + ] + } # Execute result = await self._list_indices_tool(self.ListIndicesArgs()) # Assert assert len(result) == 1 assert result[0]['type'] == 'text' - # Should include the full JSON output by default - assert '"index": "index1"' in result[0]['text'] - assert '"docs.count": "100"' in result[0]['text'] - assert '"index": "index2"' in result[0]['text'] - assert '"docs.count": "200"' in result[0]['text'] + response_text = result[0]['text'] + assert response_text.startswith('Indices, aliases, and data streams information:') + payload = json.loads(response_text.split('\n', 1)[1]) + assert 'indices' in payload + assert 'aliases' in payload + assert 'data_streams' in payload + assert payload['indices'] == [] + assert {'alias': 'alias_common', 'indices': ['index1', 'index2']} in payload['aliases'] + assert {'alias': 'alias_unique', 'indices': ['index2']} in payload['aliases'] + assert payload['data_streams'][0]['name'] == 'logs-ds' + assert payload['data_streams'][0]['indices'][0]['index_name'] == '.ds-logs-ds-000001' self.mock_client.cat.indices.assert_called_once_with(format='json') + self.mock_client.indices.get_alias.assert_called_once_with(index='*') + self.mock_client.indices.get_data_stream.assert_called_once_with(name='*') @pytest.mark.asyncio async def test_list_indices_tool_include_detail_false(self): @@ -181,16 +221,61 @@ async def test_list_indices_tool_include_detail_false(self): 'store.size': '2mb', 'pri.store.size': '1mb', }, + { + 'health': 'green', + 'status': 'open', + 'index': '.ds-metrics-ds-000001', + 'uuid': 'uuid2a', + 'pri': '1', + 'rep': '1', + 'docs.count': '10', + 'docs.deleted': '0', + 'store.size': '100kb', + 'pri.store.size': '50kb', + }, ] + self.mock_client.indices.get_alias.return_value = { + 'index1': {'aliases': {'alias_common': {}, 'alias_one': {}}}, + 'index2': {'aliases': {'alias_common': {}}}, + } + self.mock_client.indices.get_data_stream.return_value = { + 'data_streams': [ + { + 'name': 'metrics-ds', + 'timestamp_field': {'name': '@timestamp'}, + 'indices': [ + {'index_name': '.ds-metrics-ds-000001', 'index_uuid': 'uuid-1'}, + {'index_name': '.ds-metrics-ds-000002', 'index_uuid': 'uuid-2'}, + ], + 'generation': 2, + 'status': 'GREEN', + 'template': 'metrics-template', + } + ] + } # Execute result = await self._list_indices_tool(self.ListIndicesArgs(include_detail=False)) # Assert assert len(result) == 1 assert result[0]['type'] == 'text' - payload = json.loads(result[0]['text'].split('\n', 1)[1]) - assert payload == ['index1', 'index2'] - assert 'docs.count' not in result[0]['text'] + response_text = result[0]['text'] + assert response_text.startswith('Indices, aliases, and data streams information:') + payload = json.loads(response_text.split('\n', 1)[1]) + assert payload['indices'] == [] + assert { + 'alias': 'alias_common', + 'index_count': 2, + } in payload['aliases'] + assert { + 'alias': 'alias_one', + 'index_count': 1, + } in payload['aliases'] + assert payload['data_streams'][0]['name'] == 'metrics-ds' + assert payload['data_streams'][0]['index_count'] == 2 + assert 'indices' not in payload['data_streams'][0] self.mock_client.cat.indices.assert_called_once_with(format='json') + self.mock_client.indices.get_alias.assert_called_once_with(index='*') + self.mock_client.indices.get_data_stream.assert_called_once_with(name='*') @pytest.mark.asyncio async def test_list_indices_tool_with_index(self): @@ -252,6 +337,8 @@ async def test_list_indices_tool_error(self): assert result[0]['type'] == 'text' assert 'Error listing indices: Test error' in result[0]['text'] self.mock_client.cat.indices.assert_called_once_with(format='json') + self.mock_client.indices.get_alias.assert_not_called() + self.mock_client.indices.get_data_stream.assert_not_called() @pytest.mark.asyncio async def test_get_index_mapping_tool(self):