Skip to content

Commit 77912cc

Browse files
committed
Add alias and data stream to list tool
Signed-off-by: Sun Ro Lee <sunro.lee@linecorp.com>
1 parent c8377dd commit 77912cc

File tree

4 files changed

+284
-25
lines changed

4 files changed

+284
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
1111
- Enhance tool filtering ([#101](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/101))
1212
- Add core tools as a category ([#103](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/103))
1313
- set stateless=True for streaming server by default ([#104](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/104))
14+
- Expand ListIndexTool to return aliases and data streams while hiding backing indices from general listings ([#108](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/108))
1415

1516
### Fixed
1617

src/opensearch/helper.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
import json
55
import logging
6+
from copy import deepcopy
67
from semver import Version
8+
from opensearchpy.exceptions import NotFoundError
79
from tools.tool_params import *
810

911
# Configure logging
@@ -12,12 +14,104 @@
1214

1315
# List all the helper functions, these functions perform a single rest call to opensearch
1416
# these functions will be used in tools folder to eventually write more complex tools
15-
def list_indices(args: ListIndicesArgs) -> json:
17+
def list_indices(args: ListIndicesArgs) -> list[dict]:
1618
from .client import initialize_client
1719

1820
client = initialize_client(args)
1921
response = client.cat.indices(format='json')
20-
return response
22+
if not isinstance(response, list):
23+
logger.warning('Unexpected response type for cat.indices: %s', type(response))
24+
return []
25+
26+
sanitized: list[dict] = []
27+
for item in response:
28+
if not isinstance(item, dict):
29+
continue
30+
index_name = item.get('index')
31+
if not isinstance(index_name, str):
32+
continue
33+
sanitized.append(item)
34+
35+
return sanitized
36+
37+
38+
def list_aliases(args: ListIndicesArgs) -> dict[str, list[str]]:
39+
from .client import initialize_client
40+
41+
client = initialize_client(args)
42+
43+
try:
44+
response = client.indices.get_alias(index='*')
45+
except NotFoundError:
46+
return {}
47+
48+
if not isinstance(response, dict):
49+
logger.warning('Unexpected response type for indices.get_alias: %s', type(response))
50+
return {}
51+
52+
alias_map: dict[str, set[str]] = {}
53+
for index_name, alias_entry in response.items():
54+
if not isinstance(index_name, str) or not isinstance(alias_entry, dict):
55+
continue
56+
aliases = alias_entry.get('aliases', {})
57+
if not isinstance(aliases, dict):
58+
continue
59+
for alias_name in aliases.keys():
60+
if not isinstance(alias_name, str):
61+
continue
62+
alias_map.setdefault(alias_name, set()).add(index_name)
63+
64+
return {alias: sorted(index_names) for alias, index_names in alias_map.items()}
65+
66+
67+
def list_data_streams(args: ListIndicesArgs) -> list[dict]:
68+
from .client import initialize_client
69+
70+
client = initialize_client(args)
71+
72+
try:
73+
response = client.indices.get_data_stream(name='*')
74+
except NotFoundError:
75+
return []
76+
77+
if not isinstance(response, dict):
78+
logger.warning('Unexpected response type for indices.get_data_stream: %s', type(response))
79+
return []
80+
81+
data_streams = response.get('data_streams', [])
82+
if not isinstance(data_streams, list):
83+
logger.warning('Unexpected data_streams payload type: %s', type(data_streams))
84+
return []
85+
86+
sanitized: list[dict] = []
87+
for stream in data_streams:
88+
if not isinstance(stream, dict):
89+
continue
90+
name = stream.get('name')
91+
if not isinstance(name, str):
92+
continue
93+
94+
sanitized_stream = {key: deepcopy(value) for key, value in stream.items() if key != 'indices'}
95+
96+
sanitized_indices: list[dict] = []
97+
indices_payload = stream.get('indices', [])
98+
if isinstance(indices_payload, list):
99+
for entry in indices_payload:
100+
if not isinstance(entry, dict):
101+
continue
102+
index_name = entry.get('index_name')
103+
if not isinstance(index_name, str):
104+
continue
105+
sanitized_entry = {'index_name': index_name}
106+
index_uuid = entry.get('index_uuid')
107+
if isinstance(index_uuid, str):
108+
sanitized_entry['index_uuid'] = index_uuid
109+
sanitized_indices.append(sanitized_entry)
110+
111+
sanitized_stream['indices'] = sanitized_indices
112+
sanitized.append(sanitized_stream)
113+
114+
return sanitized
21115

22116

23117
def get_index(args: ListIndicesArgs) -> json:

src/tools/tools.py

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
import json
5+
from typing import Any
6+
57
from .tool_params import (
68
GetAllocationArgs,
79
GetClusterStateArgs,
@@ -35,6 +37,8 @@
3537
get_query_insights,
3638
get_segments,
3739
get_shards,
40+
list_aliases,
41+
list_data_streams,
3842
list_indices,
3943
search_index,
4044
)
@@ -76,26 +80,99 @@ async def list_indices_tool(args: ListIndicesArgs) -> list[dict]:
7680
{'type': 'text', 'text': f'Index information for {args.index}:\n{formatted_info}'}
7781
]
7882

79-
# Otherwise, list all indices
83+
# Otherwise, list all indices, aliases, and data streams
8084
indices = list_indices(args)
81-
82-
# If include_detail is False, return only pure list of index names
83-
if not args.include_detail:
84-
index_names = [
85-
item.get('index')
86-
for item in indices
87-
if isinstance(item, dict) and 'index' in item
88-
]
89-
formatted_names = json.dumps(index_names, indent=2)
90-
return [{'type': 'text', 'text': f'Indices:\n{formatted_names}'}]
91-
92-
# include_detail is True: return full information
93-
formatted_indices = json.dumps(indices, indent=2)
94-
return [{'type': 'text', 'text': f'All indices information:\n{formatted_indices}'}]
85+
aliases = list_aliases(args)
86+
data_streams = list_data_streams(args)
87+
88+
indices_payload = _format_indices(indices, aliases, data_streams, args.include_detail)
89+
aliases_payload = _format_aliases(aliases, args.include_detail)
90+
data_streams_payload = _format_data_streams(data_streams, args.include_detail)
91+
92+
combined_payload = {
93+
'indices': indices_payload,
94+
'aliases': aliases_payload,
95+
'data_streams': data_streams_payload,
96+
}
97+
98+
formatted_payload = json.dumps(combined_payload, indent=2)
99+
response_text = 'Indices, aliases, and data streams information:\n' + formatted_payload
100+
return [{'type': 'text', 'text': response_text}]
95101
except Exception as e:
96102
return [{'type': 'text', 'text': f'Error listing indices: {str(e)}'}]
97103

98104

105+
def _format_indices(
106+
indices: list[dict],
107+
aliases: dict[str, list[str]],
108+
data_streams: list[dict],
109+
include_detail: bool,
110+
) -> list[Any]:
111+
excluded_indices: set[str] = set()
112+
113+
for index_list in aliases.values():
114+
excluded_indices.update(index_list)
115+
116+
for stream in data_streams:
117+
indices_payload = stream.get('indices', [])
118+
if isinstance(indices_payload, list):
119+
for entry in indices_payload:
120+
if not isinstance(entry, dict):
121+
continue
122+
index_name = entry.get('index_name')
123+
if isinstance(index_name, str):
124+
excluded_indices.add(index_name)
125+
126+
filtered = [
127+
item for item in indices if item.get('index') not in excluded_indices
128+
]
129+
if include_detail:
130+
return filtered
131+
return [item['index'] for item in filtered if 'index' in item]
132+
133+
134+
def _format_aliases(
135+
aliases: dict[str, list[str]], include_detail: bool
136+
) -> list[dict[str, Any]]:
137+
sorted_alias_items = sorted(aliases.items())
138+
139+
if include_detail:
140+
payload = [
141+
{'alias': alias_name, 'indices': index_list}
142+
for alias_name, index_list in sorted_alias_items
143+
]
144+
else:
145+
payload = [
146+
{'alias': alias_name, 'index_count': len(index_list)}
147+
for alias_name, index_list in sorted_alias_items
148+
]
149+
150+
return payload
151+
152+
153+
def _format_data_streams(
154+
data_streams: list[dict], include_detail: bool
155+
) -> list[dict[str, Any]]:
156+
if include_detail:
157+
return data_streams
158+
159+
payload = []
160+
for stream in data_streams:
161+
simplified = {k: v for k, v in stream.items() if k != 'indices'}
162+
indices_payload = stream.get('indices', [])
163+
index_count = 0
164+
if isinstance(indices_payload, list):
165+
index_count = sum(
166+
1
167+
for entry in indices_payload
168+
if isinstance(entry, dict) and isinstance(entry.get('index_name'), str)
169+
)
170+
simplified['index_count'] = index_count
171+
payload.append(simplified)
172+
173+
return payload
174+
175+
99176
async def get_index_mapping_tool(args: GetIndexMappingArgs) -> list[dict]:
100177
try:
101178
check_tool_compatibility('IndexMappingTool', args)

0 commit comments

Comments
 (0)