Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Add `GenericOpenSearchApiTool` - A flexible, general-purpose tool that can interact with any OpenSearch API endpoint, addressing tool explosion and reducing context size. Supports all HTTP methods with write operation protection via `OPENSEARCH_SETTINGS_ALLOW_WRITE` environment variable. Closes [#109](https://github.com/opensearch-project/opensearch-mcp-server-py/issues/109)
- Add header-based authentication + Code Clean up ([#117](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/117))
- Add skills tools integration ([#121](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/121))

### Fixed
- Fix Concurrency: Use Async OpenSearch client to improve concurrency ([#125](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/125))
Expand Down
104 changes: 104 additions & 0 deletions src/tools/skills_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0

import json
import logging
from typing import Dict, Any, List
from .tool_params import baseToolArgs
from pydantic import Field
from opensearch.client import initialize_client

logger = logging.getLogger(__name__)

class DataDistributionToolArgs(baseToolArgs):
index: str = Field(description="Target OpenSearch index name")
selectionTimeRangeStart: str = Field(description="Start time for analysis period")
selectionTimeRangeEnd: str = Field(description="End time for analysis period")
timeField: str = Field(description="Date/time field for filtering(requied)")
baselineTimeRangeStart: str = Field(default="", description="Start time for baseline period (optional)")
baselineTimeRangeEnd: str = Field(default="", description="End time for baseline period (optional)")
size: int = Field(default=1000, description="Maximum number of documents to analyze")

class LogPatternAnalysisToolArgs(baseToolArgs):
index: str = Field(description="Target OpenSearch index name containing log data")
logFieldName: str = Field(description="Field containing raw log messages to analyze")
selectionTimeRangeStart: str = Field(description="Start time for analysis target period")
selectionTimeRangeEnd: str = Field(description="End time for analysis target period")
timeField: str = Field(description="Date/time field for time-based filtering(requied)")
traceFieldName: str = Field(default="", description="Field for trace/correlation ID (optional)")
baseTimeRangeStart: str = Field(default="", description="Start time for baseline comparison period (optional)")
baseTimeRangeEnd: str = Field(default="", description="End time for baseline comparison period (optional)")

async def call_opensearch_tool(tool_name: str, parameters: Dict[str, Any], args: baseToolArgs) -> List[Dict]:
"""Call OpenSearch ML tools API"""
try:
client = initialize_client(args)

# Call OpenSearch ML tools execute API
response = client.transport.perform_request(
'POST',
f'/_plugins/_ml/tools/_execute/{tool_name}',
body={'parameters': parameters}
)

logger.info(f"Tool {tool_name} result: {json.dumps(response, indent=2)}")
formatted_result = json.dumps(response, indent=2)
return [{'type': 'text', 'text': f'{tool_name} result:\n{formatted_result}'}]

except Exception as e:
return [{'type': 'text', 'text': f'Error executing {tool_name}: {str(e)}'}]

async def data_distribution_tool(args: DataDistributionToolArgs) -> List[Dict]:
params = {
'index': args.index,
'timeField': args.timeField,
'selectionTimeRangeStart': args.selectionTimeRangeStart,
'selectionTimeRangeEnd': args.selectionTimeRangeEnd,
'size': args.size
}
if args.baselineTimeRangeStart:
params['baselineTimeRangeStart'] = args.baselineTimeRangeStart
if args.baselineTimeRangeEnd:
params['baselineTimeRangeEnd'] = args.baselineTimeRangeEnd

result = await call_opensearch_tool('DataDistributionTool', params, args)
return result

async def log_pattern_analysis_tool(args: LogPatternAnalysisToolArgs) -> List[Dict]:
params = {
'index': args.index,
'timeField': args.timeField,
'logFieldName': args.logFieldName,
'selectionTimeRangeStart': args.selectionTimeRangeStart,
'selectionTimeRangeEnd': args.selectionTimeRangeEnd
}
if args.traceFieldName:
params['traceFieldName'] = args.traceFieldName
if args.baseTimeRangeStart:
params['baseTimeRangeStart'] = args.baseTimeRangeStart
if args.baseTimeRangeEnd:
params['baseTimeRangeEnd'] = args.baseTimeRangeEnd

result = await call_opensearch_tool('LogPatternAnalysisTool', params, args)
return result

SKILLS_TOOLS_REGISTRY = {
'DataDistributionTool': {
'display_name': 'DataDistributionTool',
'description': 'Analyzes data distribution patterns and field value frequencies within OpenSearch indices. Supports both single dataset analysis for understanding data characteristics and comparative analysis between two time periods to identify distribution changes. Automatically detects useful fields, calculates value distributions, groups numeric data, and computes divergence metrics. Useful for anomaly detection, data quality assessment, and trend analysis. We can use this tool to analyze the distribution of failures over time',
'input_schema': DataDistributionToolArgs.model_json_schema(),
'function': data_distribution_tool,
'args_model': DataDistributionToolArgs,
'min_version': '3.3.0',
'http_methods': 'POST',
},
'LogPatternAnalysisTool': {
'display_name': 'LogPatternAnalysisTool',
'description': 'Intelligent log pattern analysis tool for troubleshooting and anomaly detection in application logs. Use this tool when you need to: analyze error patterns in logs, identify unusual log sequences, compare log patterns between time periods, find root causes of system issues, detect anomalous behavior in application traces, or investigate performance problems. The tool automatically extracts meaningful patterns from raw log messages, groups similar patterns, identifies outliers, and provides insights for debugging. Essential for log-based troubleshooting, incident analysis, and proactive monitoring of system health.',
'input_schema': LogPatternAnalysisToolArgs.model_json_schema(),
'function': log_pattern_analysis_tool,
'args_model': LogPatternAnalysisToolArgs,
'min_version': '3.3.0',
'http_methods': 'POST',
},
}
2 changes: 2 additions & 0 deletions src/tools/tool_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ def process_tool_filter(
'ExplainTool',
'MsearchTool',
'GenericOpenSearchApiTool',
'DataDistributionTool',
'LogPatternAnalysisTool',
]

# Build core tools list using display names
Expand Down
2 changes: 2 additions & 0 deletions src/tools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
list_indices,
search_index,
)
from .skills_tools import SKILLS_TOOLS_REGISTRY


async def check_tool_compatibility(tool_name: str, args: baseToolArgs = None):
Expand Down Expand Up @@ -492,6 +493,7 @@ async def get_long_running_tasks_tool(args: GetLongRunningTasksArgs) -> list[dic

# Registry of available OpenSearch tools with their metadata
TOOL_REGISTRY = {
**SKILLS_TOOLS_REGISTRY,
'ListIndexTool': {
'display_name': 'ListIndexTool',
'description': 'Lists indices in the OpenSearch cluster. By default, returns a filtered list of index names only to minimize response size. Set include_detail=true to return full metadata from cat.indices (docs.count, store.size, etc.). If an index parameter is provided, returns detailed information for that specific index including mappings and settings.',
Expand Down
Loading