Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Enhance tool filtering ([#101](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/101))
- Add core tools as a category ([#103](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/103))
- set stateless=True for streaming server by default ([#104](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/104))
- Expand ListIndexTool to return aliases and data streams while hiding backing indices from general listings ([#108](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/108))

### Fixed

Expand Down
98 changes: 96 additions & 2 deletions src/opensearch/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import json
import logging
from copy import deepcopy
from semver import Version
from opensearchpy.exceptions import NotFoundError
from tools.tool_params import *

# Configure logging
Expand All @@ -12,12 +14,104 @@

# List all the helper functions, these functions perform a single rest call to opensearch
# these functions will be used in tools folder to eventually write more complex tools
def list_indices(args: ListIndicesArgs) -> json:
def list_indices(args: ListIndicesArgs) -> list[dict]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
Would not block this PR, but something to perhaps think of

On a high level for list_indices, list_aliases, and list_data_streams

Would it make sense to create a class for the responses and parse the response into the classes? We can perhaps throw some sort of error if parsing fails. That way we can skip doing checks like isInstance etc?

from .client import initialize_client

client = initialize_client(args)
response = client.cat.indices(format='json')
return response
if not isinstance(response, list):
logger.warning('Unexpected response type for cat.indices: %s', type(response))
return []
Comment on lines +22 to +24
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user is not authorized to perform the request, we might get an error response. This error should be propagated to the client making request in some form. But this would return an empty list


sanitized: list[dict] = []
for item in response:
if not isinstance(item, dict):
continue
index_name = item.get('index')
if not isinstance(index_name, str):
continue
sanitized.append(item)

return sanitized


def list_aliases(args: ListIndicesArgs) -> dict[str, list[str]]:
from .client import initialize_client

client = initialize_client(args)

try:
response = client.indices.get_alias(index='*')
except NotFoundError:
return {}

if not isinstance(response, dict):
logger.warning('Unexpected response type for indices.get_alias: %s', type(response))
return {}
Comment on lines +48 to +50
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, should we handle it differently?


alias_map: dict[str, set[str]] = {}
for index_name, alias_entry in response.items():
if not isinstance(index_name, str) or not isinstance(alias_entry, dict):
continue
aliases = alias_entry.get('aliases', {})
if not isinstance(aliases, dict):
continue
for alias_name in aliases.keys():
if not isinstance(alias_name, str):
continue
alias_map.setdefault(alias_name, set()).add(index_name)

return {alias: sorted(index_names) for alias, index_names in alias_map.items()}


def list_data_streams(args: ListIndicesArgs) -> list[dict]:
from .client import initialize_client

client = initialize_client(args)

try:
response = client.indices.get_data_stream(name='*')
except NotFoundError:
return []

if not isinstance(response, dict):
logger.warning('Unexpected response type for indices.get_data_stream: %s', type(response))
return []
Comment on lines +77 to +79
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above


data_streams = response.get('data_streams', [])
if not isinstance(data_streams, list):
logger.warning('Unexpected data_streams payload type: %s', type(data_streams))
return []

sanitized: list[dict] = []
for stream in data_streams:
if not isinstance(stream, dict):
continue
name = stream.get('name')
if not isinstance(name, str):
continue

sanitized_stream = {key: deepcopy(value) for key, value in stream.items() if key != 'indices'}

sanitized_indices: list[dict] = []
indices_payload = stream.get('indices', [])
if isinstance(indices_payload, list):
for entry in indices_payload:
if not isinstance(entry, dict):
continue
index_name = entry.get('index_name')
if not isinstance(index_name, str):
continue
sanitized_entry = {'index_name': index_name}
index_uuid = entry.get('index_uuid')
if isinstance(index_uuid, str):
sanitized_entry['index_uuid'] = index_uuid
sanitized_indices.append(sanitized_entry)

sanitized_stream['indices'] = sanitized_indices
sanitized.append(sanitized_stream)

return sanitized


def get_index(args: ListIndicesArgs) -> json:
Expand Down
107 changes: 92 additions & 15 deletions src/tools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: Apache-2.0

import json
from typing import Any

from .tool_params import (
GetAllocationArgs,
GetClusterStateArgs,
Expand Down Expand Up @@ -35,6 +37,8 @@
get_query_insights,
get_segments,
get_shards,
list_aliases,
list_data_streams,
list_indices,
search_index,
)
Expand Down Expand Up @@ -76,26 +80,99 @@ async def list_indices_tool(args: ListIndicesArgs) -> list[dict]:
{'type': 'text', 'text': f'Index information for {args.index}:\n{formatted_info}'}
]

# Otherwise, list all indices
# Otherwise, list all indices, aliases, and data streams
indices = list_indices(args)

# If include_detail is False, return only pure list of index names
if not args.include_detail:
index_names = [
item.get('index')
for item in indices
if isinstance(item, dict) and 'index' in item
]
formatted_names = json.dumps(index_names, indent=2)
return [{'type': 'text', 'text': f'Indices:\n{formatted_names}'}]

# include_detail is True: return full information
formatted_indices = json.dumps(indices, indent=2)
return [{'type': 'text', 'text': f'All indices information:\n{formatted_indices}'}]
aliases = list_aliases(args)
data_streams = list_data_streams(args)

indices_payload = _format_indices(indices, aliases, data_streams, args.include_detail)
aliases_payload = _format_aliases(aliases, args.include_detail)
data_streams_payload = _format_data_streams(data_streams, args.include_detail)

combined_payload = {
'indices': indices_payload,
'aliases': aliases_payload,
'data_streams': data_streams_payload,
}

formatted_payload = json.dumps(combined_payload, indent=2)
response_text = 'Indices, aliases, and data streams information:\n' + formatted_payload
return [{'type': 'text', 'text': response_text}]
except Exception as e:
return [{'type': 'text', 'text': f'Error listing indices: {str(e)}'}]


def _format_indices(
indices: list[dict],
aliases: dict[str, list[str]],
data_streams: list[dict],
include_detail: bool,
) -> list[Any]:
excluded_indices: set[str] = set()

for index_list in aliases.values():
excluded_indices.update(index_list)

for stream in data_streams:
indices_payload = stream.get('indices', [])
if isinstance(indices_payload, list):
for entry in indices_payload:
if not isinstance(entry, dict):
continue
index_name = entry.get('index_name')
if isinstance(index_name, str):
excluded_indices.add(index_name)

filtered = [
item for item in indices if item.get('index') not in excluded_indices
]
if include_detail:
return filtered
return [item['index'] for item in filtered if 'index' in item]


def _format_aliases(
aliases: dict[str, list[str]], include_detail: bool
) -> list[dict[str, Any]]:
sorted_alias_items = sorted(aliases.items())

if include_detail:
payload = [
{'alias': alias_name, 'indices': index_list}
for alias_name, index_list in sorted_alias_items
]
else:
payload = [
{'alias': alias_name, 'index_count': len(index_list)}
for alias_name, index_list in sorted_alias_items
]

return payload


def _format_data_streams(
data_streams: list[dict], include_detail: bool
) -> list[dict[str, Any]]:
if include_detail:
return data_streams

payload = []
for stream in data_streams:
simplified = {k: v for k, v in stream.items() if k != 'indices'}
indices_payload = stream.get('indices', [])
index_count = 0
if isinstance(indices_payload, list):
index_count = sum(
1
for entry in indices_payload
if isinstance(entry, dict) and isinstance(entry.get('index_name'), str)
)
simplified['index_count'] = index_count
payload.append(simplified)

return payload


async def get_index_mapping_tool(args: GetIndexMappingArgs) -> list[dict]:
try:
check_tool_compatibility('IndexMappingTool', args)
Expand Down
Loading