diff --git a/README.md b/README.md index 8372d24..9573521 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,20 @@ The repository contains tools for Apache SeaTunnel. +## Tool 1 - SeaTunnel MCP Server + +What is MCP? +- MCP (Model Context Protocol) is an open protocol for connecting LLMs to tools, data, and systems. With SeaTunnel MCP, you can operate SeaTunnel directly from an LLM-powered interface while keeping the server-side logic secure and auditable. +- Learn more: https://github.com/modelcontextprotocol + +SeaTunnel MCP Server +- Source folder: [seatunnel-mcp/](seatunnel-mcp/) +- English README: [seatunnel-mcp/README.md](seatunnel-mcp/README.md) +- Chinese: [seatunnel-mcp/README_CN.md](seatunnel-mcp/README_CN.md) +- Quick Start: [seatunnel-mcp/docs/QUICK_START.md](seatunnel-mcp/docs/QUICK_START.md) +- User Guide: [seatunnel-mcp/docs/USER_GUIDE.md](seatunnel-mcp/docs/USER_GUIDE.md) +- Developer Guide: [seatunnel-mcp/docs/DEVELOPER_GUIDE.md](seatunnel-mcp/docs/DEVELOPER_GUIDE.md) + +For screenshots, demo video, features, installation and usage instructions, please refer to the README in the seatunnel-mcp directory. + Get the main project from [Apache SeaTunnel](https://github.com/apache/seatunnel) \ No newline at end of file diff --git a/seatunnel-mcp/.env.example b/seatunnel-mcp/.env.example new file mode 100644 index 0000000..23c2441 --- /dev/null +++ b/seatunnel-mcp/.env.example @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +# SeaTunnel MCP Server Configuration + +# MCP server configuration +MCP_HOST=127.0.0.1 +MCP_PORT=8080 + +# SeaTunnel API configuration +SEATUNNEL_API_URL=http://localhost:8090 +SEATUNNEL_API_KEY=your_api_key_here \ No newline at end of file diff --git a/seatunnel-mcp/.gitignore b/seatunnel-mcp/.gitignore new file mode 100644 index 0000000..12e1866 --- /dev/null +++ b/seatunnel-mcp/.gitignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +env/ +ENV/ +.venv/ +.env/ + +# Testing +.coverage +htmlcov/ +.pytest_cache/ +.tox/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Project specific +.env \ No newline at end of file diff --git a/seatunnel-mcp/.pre-commit-config.yaml b/seatunnel-mcp/.pre-commit-config.yaml new file mode 100644 index 0000000..bc9bd62 --- /dev/null +++ b/seatunnel-mcp/.pre-commit-config.yaml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + - id: check-json + - id: debug-statements + - id: check-merge-conflict + +- repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort + args: ["--profile", "black", "--line-length", "100"] + +- repo: https://github.com/psf/black + rev: 23.3.0 + hooks: + - id: black + args: ["--line-length", "100"] + +- repo: https://github.com/pycqa/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + args: ["--max-line-length", "100", "--extend-ignore", "E203,E501"] + exclude: ^tests/ + +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.3.0 + hooks: + - id: mypy + additional_dependencies: [types-requests, types-setuptools, types-PyYAML] + exclude: ^tests/ \ No newline at end of file diff --git a/seatunnel-mcp/CHANGELOG.md b/seatunnel-mcp/CHANGELOG.md new file mode 100644 index 0000000..e6acfe6 --- /dev/null +++ b/seatunnel-mcp/CHANGELOG.md @@ -0,0 +1,49 @@ +# Changelog + +记录 SeaTunnel MCP 项目的所有重要变更。 + +格式基于 [Keep a Changelog](https://keepachangelog.com/zh-CN/1.0.0/), +本项目遵循 [语义化版本](https://semver.org/lang/zh-CN/spec/v2.0.0.html)。 + +## [1.1.0] - 2025-04-10 + +### 新增 + +- 添加 `submit-jobs` 和`submit-job/upload` 工具用于批量提交作业 + - 允许通过单个 API 调用同时提交多个作业 + - 用户输入直接作为请求体传递给 API,无需额外封装 + - 支持标准 JSON 格式的作业配置 + +### 改进 + +- 更新文档,增加批量提交作业的示例和说明 +- 添加全面的测试覆盖新功能 + +## [1.0.0] - 2025-04-01 + +### 新增 + +- 首个稳定正式版本 +- 完善文档和测试 +- 加强错误处理 +- 改进用户体验 + +## [0.1.0] - 2023-07-15 + +### 新增 + +- 初始版本发布 +- 实现基本的 SeaTunnel REST API 客户端 +- 实现基于 MCP 的 SeaTunnel 工具集 + - 连接管理工具 + - 作业管理工具 + - 系统监控工具 +- 创建项目文档 + - 用户指南 + - 开发者指南 +- 添加启动脚本和示例配置 +- 添加基本单元测试 + +### 已修复 + +- 无(初始版本) \ No newline at end of file diff --git a/seatunnel-mcp/Dockerfile b/seatunnel-mcp/Dockerfile new file mode 100644 index 0000000..49af152 --- /dev/null +++ b/seatunnel-mcp/Dockerfile @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +FROM python:3.12-slim + +WORKDIR /app + +# 复制项目文件 +COPY . . + +# 安装依赖 +RUN pip install --no-cache-dir -e . + +# 暴露端口 +EXPOSE 8080 + +# 设置环境变量 +ENV MCP_HOST=0.0.0.0 +ENV MCP_PORT=8080 +ENV SEATUNNEL_API_URL=http://seatunnel:8090 + +# 启动服务 +CMD ["python", "-m", "src.seatunnel_mcp"] \ No newline at end of file diff --git a/seatunnel-mcp/LICENSE b/seatunnel-mcp/LICENSE new file mode 100644 index 0000000..0ea9a6d --- /dev/null +++ b/seatunnel-mcp/LICENSE @@ -0,0 +1,190 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2023 SeaTunnel MCP Team + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/seatunnel-mcp/README.md b/seatunnel-mcp/README.md new file mode 100644 index 0000000..91bea0c --- /dev/null +++ b/seatunnel-mcp/README.md @@ -0,0 +1,190 @@ +# SeaTunnel MCP Server + +A Model Context Protocol (MCP) server for interacting with SeaTunnel through LLM interfaces like Claude. + +![SeaTunnel MCP Logo](./docs/img/seatunnel-mcp-logo.png) + +![SeaTunnel MCP Server](./docs/img/img.png) + +## Operation Video + +To help you better understand the features and usage of SeaTunnel MCP, we provide a video demonstration. Please refer to the link below or directly check the video file in the project documentation directory. + +https://www.youtube.com/watch?v=JaLA8EkZD7Q + +[![IMAGE ALT TEXT HERE](https://img.youtube.com/vi/JaLA8EkZD7Q/0.jpg)](https://www.youtube.com/watch?v=JaLA8EkZD7Q) + + +> **Tip**: If the video does not play directly, make sure your device supports MP4 format and try opening it with a modern browser or video player. + + +## Features + +* Job management (submit, stop, monitor) +* System monitoring and information retrieval +* REST API interaction with SeaTunnel services +* Built-in logging and monitoring tools +* Dynamic connection configuration +* Comprehensive job information and statistics + +## Installation + +```bash +# Clone repository +git clone +cd seatunnel-mcp + +# Create virtual environment and install +python -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate +pip install -e . +``` + +## Requirements + +* Python ≥ 3.12 +* Running SeaTunnel instance +* Node.js (for testing with MCP Inspector) + +## Usage + +### Environment Variables + +``` +SEATUNNEL_API_URL=http://localhost:8090 # Default SeaTunnel REST API URL +SEATUNNEL_API_KEY=your_api_key # Optional: Default SeaTunnel API key +``` + +### Dynamic Connection Configuration + +The server provides tools to view and update connection settings at runtime: + +* `get-connection-settings`: View current connection URL and API key status +* `update-connection-settings`: Update URL and/or API key to connect to a different SeaTunnel instance + +Example usage through MCP: + +```json +// Get current settings +{ + "name": "get-connection-settings" +} + +// Update connection settings +{ + "name": "update-connection-settings", + "arguments": { + "url": "http://new-host:8090", + "api_key": "new-api-key" + } +} +``` + +### Job Management + +The server provides tools to submit and manage SeaTunnel jobs: + +* `submit-job`: Submit a new job with job configuration +* `submit-jobs`: Submit multiple jobs in batch +* `stop-job`: Stop a running job +* `get-job-info`: Get detailed information about a specific job +* `get-running-jobs`: List all currently running jobs +* `get-finished-jobs`: List all finished jobs by state (FINISHED, CANCELED, FAILED, etc.) + +### Running the Server + +```bash +python -m src.seatunnel_mcp +``` + +### Usage with Claude Desktop + +To use this with Claude Desktop, add the following to your `claude_desktop_config.json`: + +```json +{ + "mcpServers": { + "seatunnel": { + "command": "python", + "args": ["-m", "src.seatunnel_mcp"], + "cwd": "Project root directory" + } + } +} +``` + +### Testing with MCP Inspector + +```bash +npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp +``` + +## Available Tools + +### Connection Management + +* `get-connection-settings`: View current SeaTunnel connection URL and API key status +* `update-connection-settings`: Update URL and/or API key to connect to a different instance + +### Job Management + +* `submit-job`: Submit a new job with configuration in HOCON format +* `submit-job/upload`: submit job source upload configuration file +* `submit-jobs`: Submit multiple jobs in batch, directly passing user input as request body +* `stop-job`: Stop a running job with optional savepoint +* `get-job-info`: Get detailed information about a specific job +* `get-running-jobs`: List all currently running jobs +* `get-running-job`: Get details about a specific running job +* `get-finished-jobs`: List all finished jobs by state + +### System Monitoring + +* `get-overview`: Get an overview of the SeaTunnel cluster +* `get-system-monitoring-information`: Get detailed system monitoring information + +## Changelog + +### v1.2.0 (2025-06-10) + +**New Features in v1.2.0** +- **SSE Support**: Added `st-mcp-sse` for real-time communication with SeaTunnel MCP via Server-Sent Events (SSE). Corresponding sse branch +- **UV/Studio Mode**: Added `st-mcp-uv` (or `st-mcp-studio`) to support running the MCP server using the `uv` tool for improved performance and async support. Corresponding to uv branch + +#### Example `claude_desktop_config.json`: + +```json +{ + "mcpServers": { + "st-mcp-sse": { + "url": "http://your-server:18080/sse" + }, + "st-mcp-uv": { + "command": "uv", + "args": ["run", "seatunnel-mcp"], + "env": { + "SEATUNNEL_API_URL": "http://127.0.0.1:8080" + } + } + } +} + +``` + +### v1.1.0 (2025-04-10) + +- **New Feature**: Added `submit-jobs` and `submit-job/upload` tool for batch job submission and Document submission operations + - Allows submitting multiple jobs at once with a single API call + - Input is passed directly as the request body to the API + - Supports JSON format for job configurations + - Allow submission of jobs based on documents + +### v1.0.0 (Initial Release) + +- Initial release with basic SeaTunnel integration capabilities +- Job management tools (submit, stop, monitor) +- System monitoring tools +- Connection configuration utilities + +## License + +Apache License \ No newline at end of file diff --git a/seatunnel-mcp/README_CN.md b/seatunnel-mcp/README_CN.md new file mode 100644 index 0000000..9f345dc --- /dev/null +++ b/seatunnel-mcp/README_CN.md @@ -0,0 +1,244 @@ +# SeaTunnel MCP 服务器 + +SeaTunnel MCP(Model Context Protocol)服务器,提供与大型语言模型(如 Claude)交互的能力,使其能够操作 SeaTunnel 任务。 + +![SeaTunnel MCP Logo](./docs/img/seatunnel-mcp-logo.png) + +![SeaTunnel MCP Server](./docs/img/img.png) + +## 操作视频 + +为了帮助您更好地了解 SeaTunnel MCP 的功能和使用方法,我们提供了一段操作视频演示。请参考以下链接或直接在项目文档目录中查看视频文件。 + +https://www.bilibili.com/video/BV1UXZgY8EqS + +> **提示**:如果视频无法直接播放,请确保您的设备支持 MP4 格式,并尝试使用现代浏览器或视频播放器打开。 + + +## 功能特点 + +* **作业管理**:提交、停止、监控 SeaTunnel 作业 +* **系统监控**:获取集群概览和详细的系统监控信息 +* **REST API 交互**:与 SeaTunnel 服务进行无缝交互 +* **内置日志和监控工具**:全面的日志和监控功能 +* **动态连接配置**:能够在运行时切换不同的 SeaTunnel 实例 +* **全面的作业信息**:提供详细的作业运行状态和统计数据 + +## 安装 + +```bash +# 克隆仓库 +git clone <仓库URL> +cd seatunnel-mcp + +# 创建虚拟环境并安装 +python -m venv .venv +source .venv/bin/activate # Windows 系统: .venv\Scripts\activate +pip install -e . + +# 或者直接使用提供的脚本 +./run.sh # Linux/Mac +run.bat # Windows +``` + +## 系统要求 + +* Python ≥ 3.12 +* 运行中的 SeaTunnel 实例 +* Node.js(用于 MCP Inspector 测试) + +## 使用方法 + +### 环境变量配置 + +``` +SEATUNNEL_API_URL=http://localhost:8090 # 默认 SeaTunnel REST API URL +SEATUNNEL_API_KEY=your_api_key # 可选:默认 SeaTunnel API 密钥 +``` + +### 命令行工具 + +SeaTunnel MCP 提供了命令行工具,方便启动和配置服务器: + +```bash +# 显示帮助信息 +seatunnel-mcp --help + +# 初始化环境配置文件 +seatunnel-mcp init + +# 运行 MCP 服务器 +seatunnel-mcp run --api-url http://your-seatunnel:8090 + +# 为 Claude Desktop 配置 MCP 服务器 +seatunnel-mcp configure-claude +``` + +### 动态连接配置 + +服务器提供了工具来查看和更新运行时的连接设置: + +* `get-connection-settings`:查看当前连接 URL 和 API 密钥状态 +* `update-connection-settings`:更新 URL 和/或 API 密钥以连接到不同的 SeaTunnel 实例 + +MCP 使用示例: + +```json +// 获取当前设置 +{ + "name": "get-connection-settings" +} + +// 更新连接设置 +{ + "name": "update-connection-settings", + "arguments": { + "url": "http://new-host:8090", + "api_key": "new-api-key" + } +} +``` + +### 作业管理 + +服务器提供工具来提交和管理 SeaTunnel 作业: + +* `submit-job`:提交新作业 +* `submit-jobs`:批量提交多个作业 +* `stop-job`:停止运行中的作业 +* `get-job-info`:获取特定作业的详细信息 +* `get-running-jobs`:列出所有正在运行的作业 +* `get-running-job`:获取特定运行中作业的详情 +* `get-finished-jobs`:按状态列出已完成的作业 + +### 运行服务器 + +```bash +python -m src.seatunnel_mcp +# 或者使用命令行工具 +seatunnel-mcp run +``` + +### 与 Claude Desktop 集成 + +要在 Claude Desktop 中使用,请在 `claude_desktop_config.json` 中添加以下内容: + +```json +{ + "mcpServers": { + "seatunnel": { + "command": "python", + "args": ["-m", "src.seatunnel_mcp"], + "cwd": "Project root directory" + } + } +} +``` + +### 使用 MCP Inspector 测试 + +```bash +npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp +``` + +## 可用工具 + +### 连接管理 + +* `get-connection-settings`:查看当前 SeaTunnel 连接 URL 和 API 密钥状态 +* `update-connection-settings`:更新 URL 和/或 API 密钥以连接到不同实例 + +### 作业管理 + +* `submit-job`:提交新作业 +* `submit-jobs`:批量提交多个作业,直接将用户输入作为请求体传递 +* `submit-job/upload`:提交作业来源上传配置文件 +* `stop-job`:停止运行中的作业 +* `get-job-info`:获取特定作业的详细信息 +* `get-running-jobs`:列出所有正在运行的作业 +* `get-running-job`:获取特定运行中作业的详情 +* `get-finished-jobs`:按状态列出已完成的作业 + +### 系统监控 + +* `get-overview`:获取 SeaTunnel 集群概览 +* `get-system-monitoring-information`:获取详细的系统监控信息 + + + +## 开发 + +如果您想为项目贡献代码: + +1. 克隆仓库并设置开发环境: + ```bash + python -m venv .venv + source .venv/bin/activate + pip install -e ".[dev]" + ``` + +2. 安装预提交钩子: + ```bash + pip install pre-commit + pre-commit install + ``` + +3. 运行测试: + ```bash + pytest -xvs tests/ + ``` + +详细的开发指南请参阅 [开发者指南](docs/DEVELOPER_GUIDE.md)。 + +## 贡献 + +1. Fork 仓库 +2. 创建功能分支 +3. 提交变更 +4. 创建 Pull Request + +## 更新日志 + +### v1.2.0 (2025-06-09) +**v1.2.0 新功能** + +- **SSE 实时通信**:新增 `st-mcp-sse`,支持通过 Server-Sent Events(SSE)协议与 SeaTunnel MCP 实现实时数据推送。 对应sse分支 +- **UV/Studio 模式**:新增 `st-mcp-uv`(或 `st-mcp-studio`),支持通过 `uv` 工具运行 MCP 服务器,提升异步和高性能场景下的运行效率。对应uv分支 + +#### `claude_desktop_config.json` 配置示例: + +```json +{ + "mcpServers": { + "st-mcp-sse": { + "url": "http://your-server:18080/sse" + }, + "st-mcp-uv": { + "command": "uv", + "args": ["run", "seatunnel-mcp"], + "env": { + "SEATUNNEL_API_URL": "http://127.0.0.1:8080" + } + } + } +} +``` + +### v1.1.0 (2025-04-10) + +- **新功能**:添加了 `submit-jobs` 和`submit-job/upload` 工具用于批量提交作业 和 文件提交作业 + - 允许通过单个 API 调用同时提交多个作业 + - 用户输入直接作为请求体传递给 API + - 支持 JSON 格式的作业配置 + - 允许根据文件提交作业 + +### v1.0.0 (初始版本) + +- 初始版本,具备基本的 SeaTunnel 集成能力 +- 作业管理工具(提交、停止、监控) +- 系统监控工具 +- 连接配置实用工具 + +## 许可证 + +Apache License \ No newline at end of file diff --git a/seatunnel-mcp/RESTful API V2.openapi.json b/seatunnel-mcp/RESTful API V2.openapi.json new file mode 100644 index 0000000..d81bb57 --- /dev/null +++ b/seatunnel-mcp/RESTful API V2.openapi.json @@ -0,0 +1,508 @@ +{ + "openapi": "3.0.1", + "info": { + "title": "RESTful API V2", + "description": "", + "version": "1.0.0" + }, + "tags": [], + "paths": { + "/submit-job": { + "post": { + "summary": "hive2es_qymp", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [ + { + "name": "jobId", + "in": "query", + "description": "", + "required": false, + "schema": { + "type": "string" + } + }, + { + "name": "jobName", + "in": "query", + "description": "", + "required": false, + "example": "hive2es_qymp", + "schema": { + "type": "string" + } + }, + { + "name": "isStartWithSavePoint", + "in": "query", + "description": "", + "required": false, + "schema": { + "type": "string" + } + }, + { + "name": "format", + "in": "query", + "description": "", + "required": false, + "example": "hocon", + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "text/plain": { + "schema": { + "type": "string" + }, + "example": "env {\n job.mode = \"batch\"\n}\n \nsource {\n Jdbc {\n url = \"jdbc:hive2://ip:10000/default\"\n user = \"hive\"\n password = \"hive\"\n driver = \"org.apache.hive.jdbc.HiveDriver\"\n connection_check_timeout_sec = 100\n query = \"select * from tabl1 limit 100\"\n }\n}\n\ntransform {\n}\n\nsink {\n Elasticsearch {\n hosts = [\"http://ip:9200\"]\n index = \"index1\"\n username = \"\"\n password = \"\"\n schema_save_mode = \"CREATE_SCHEMA_WHEN_NOT_EXIST\"\n data_save_mode = \"DROP_DATA\"\n }\n}" + } + } + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/submit-jobs": { + "post": { + "summary": "批量提交作业", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + }, + "example": [ + { + "params": { + "jobId": "123456", + "jobName": "SeaTunnel-01" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "plugin_output": "fake", + "row.num": 1000, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [], + "sink": [ + { + "plugin_name": "Console", + "plugin_input": [ + "fake" + ] + } + ] + }, + { + "params": { + "jobId": "1234567", + "jobName": "SeaTunnel-02" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "plugin_output": "fake", + "row.num": 1000, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [], + "sink": [ + { + "plugin_name": "Console", + "plugin_input": [ + "fake" + ] + } + ] + } + ] + } + } + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/submit-job/upload": { + "post": { + "summary": "提交作业来源上传配置文件", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [ + { + "name": "jobId", + "in": "query", + "description": "", + "required": false, + "example": "", + "schema": { + "type": "string" + } + }, + { + "name": "jobName", + "in": "query", + "description": "", + "required": false, + "example": "hive2es_qymp_file", + "schema": { + "type": "string" + } + }, + { + "name": "isStartWithSavePoint", + "in": "query", + "description": "", + "required": false, + "example": "", + "schema": { + "type": "string" + } + }, + { + "name": "format", + "in": "query", + "description": "", + "required": false, + "example": "hocon", + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "multipart/form-data": { + "schema": { + "type": "object", + "properties": { + "config_file": { + "format": "binary", + "type": "string", + "example": "" + } + } + }, + "examples": {} + } + } + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/overview": { + "get": { + "summary": "返回Zeta集群的概览", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [ + { + "name": "tag1", + "in": "query", + "description": "", + "required": false, + "example": "value1", + "schema": { + "type": "string" + } + }, + { + "name": "tag2", + "in": "query", + "description": "", + "required": false, + "example": "value2", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/running-jobs": { + "get": { + "summary": "返回所有作业及其当前状态的概览", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/job-info/{jobId}": { + "get": { + "summary": "返回作业的详细信息", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [ + { + "name": "jobId", + "in": "path", + "description": "", + "required": true, + "example": 934327465587769300, + "schema": { + "type": "number" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/running-job/{jobId}": { + "get": { + "summary": "返回作业的详细信息", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [ + { + "name": "jobId", + "in": "path", + "description": "", + "required": true, + "example": 1, + "schema": { + "type": "number" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/finished-jobs/{state}": { + "get": { + "summary": "返回所有已完成的作业信息", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [ + { + "name": "state", + "in": "path", + "description": "finished job status. FINISHED,CANCELED,FAILED,UNKNOWABLE", + "required": true, + "example": "FINISHED", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/system-monitoring-information": { + "get": { + "summary": "返回系统监控信息", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + }, + "/stop-job": { + "post": { + "summary": "停止作业", + "deprecated": false, + "description": "", + "tags": [], + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + }, + "example": { + "jobId": 944829786826473500, + "isStopWithSavePoint": false + } + } + } + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": {} + } + } + }, + "headers": {} + } + }, + "security": [] + } + } + }, + "components": { + "schemas": {}, + "securitySchemes": {} + }, + "servers": [], + "security": [] +} \ No newline at end of file diff --git a/seatunnel-mcp/docker-compose.yml b/seatunnel-mcp/docker-compose.yml new file mode 100644 index 0000000..e4e8955 --- /dev/null +++ b/seatunnel-mcp/docker-compose.yml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3' + +services: + seatunnel: + image: apache/seatunnel:latest + container_name: seatunnel + ports: + - "8090:8090" + volumes: + - ./examples:/opt/seatunnel/config + networks: + - seatunnel-net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8090/overview"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 30s + + seatunnel-mcp: + build: + context: . + dockerfile: Dockerfile + container_name: seatunnel-mcp + ports: + - "8080:8080" + environment: + - SEATUNNEL_API_URL=http://seatunnel:8090 + depends_on: + seatunnel: + condition: service_healthy + networks: + - seatunnel-net + +networks: + seatunnel-net: + driver: bridge \ No newline at end of file diff --git a/seatunnel-mcp/docs/DEVELOPER_GUIDE.md b/seatunnel-mcp/docs/DEVELOPER_GUIDE.md new file mode 100644 index 0000000..0c7e20e --- /dev/null +++ b/seatunnel-mcp/docs/DEVELOPER_GUIDE.md @@ -0,0 +1,186 @@ +# SeaTunnel MCP 开发者指南 + +本指南为 SeaTunnel MCP 项目的开发者提供指导,包括如何设置开发环境、代码贡献流程和测试方法。 + +## 开发环境设置 + +1. 克隆仓库并设置开发环境: + +```bash +# 克隆仓库 +git clone +cd seatunnel-mcp + +# 创建并激活虚拟环境 +python -m venv .venv +source .venv/bin/activate # Windows: .venv\Scripts\activate + +# 安装开发依赖 +pip install -e ".[dev]" +``` + +2. 安装预提交钩子(可选但推荐): + +```bash +pip install pre-commit +pre-commit install +``` + +## 项目结构 + +``` +seatunnel-mcp/ +├── docs/ # 文档 +├── examples/ # 示例配置文件 +├── src/ +│ └── seatunnel_mcp/ # 主要源代码 +│ ├── __init__.py +│ ├── __main__.py # 入口点 +│ ├── client.py # SeaTunnel API 客户端 +│ ├── tools.py # MCP 工具定义 +│ └── schema.py # 数据模型定义 +├── tests/ # 测试 +├── .env.example # 环境变量示例 +├── pyproject.toml # 项目配置 +└── README.md # 项目说明 +``` + +## 开发流程 + +### 添加新的工具 + +1. 首先在 `client.py` 中为 SeaTunnel API 添加相应的方法 +2. 在 `tools.py` 中创建相应的 MCP 工具 +3. 更新 `get_all_tools()` 函数以包含新工具 +4. 添加单元测试 + +示例:添加一个新的工具来获取日志信息 + +```python +# 在 client.py 中 +def get_job_logs(self, jobId: Union[str, int]) -> Dict[str, Any]: + """获取作业日志信息。""" + response = self._make_request("GET", f"/job-logs/{jobId}") + return response.json() + +# 在 tools.py 中 +def get_job_logs_tool(client: SeaTunnelClient) -> Tool: + """获取作业日志的工具。""" + async def get_job_logs(jobId: Union[str, int]) -> Dict[str, Any]: + return client.get_job_logs(jobId=jobId) + + return Tool( + name="get-job-logs", + description="获取指定作业的日志信息", + fn=AsyncToolFn(get_job_logs), + parameters=models.ParametersSchema( + properties={ + "jobId": models.ParameterSchema( + type="string", + description="作业 ID", + ), + }, + required=["jobId"], + ), + ) + +# 更新 get_all_tools 函数 +def get_all_tools(client: SeaTunnelClient) -> List[Tool]: + return [ + # ... 现有工具 + get_job_logs_tool(client), + ] +``` + +### 代码风格 + +- 使用 [Black](https://black.readthedocs.io/) 进行代码格式化 +- 使用 [isort](https://pycqa.github.io/isort/) 排序导入 +- 使用 [mypy](http://mypy-lang.org/) 进行类型检查 +- 使用 [flake8](https://flake8.pycqa.org/) 进行代码风格检查 + +你可以使用以下命令执行这些检查: + +```bash +# 代码格式化 +black src tests + +# 导入排序 +isort src tests + +# 类型检查 +mypy src + +# 代码风格检查 +flake8 src tests +``` + +## 测试 + +### 单元测试 + +使用 pytest 运行单元测试: + +```bash +pytest -xvs tests/ +``` + +带覆盖率报告: + +```bash +pytest --cov=src tests/ +``` + +### 手动测试 + +使用 MCP Inspector 进行手动测试: + +```bash +npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp +``` + +### 集成测试 + +对于集成测试,你需要一个运行中的 SeaTunnel 实例。可以使用 Docker 启动一个测试实例: + +```bash +docker run -d --name seatunnel -p 8090:8090 apache/seatunnel:latest +``` + +然后运行集成测试: + +```bash +pytest -xvs tests/integration/ +``` + +## 文档 + +- 所有公共函数、类和方法应有清晰的 docstring +- 遵循 [Google Python 风格指南](https://google.github.io/styleguide/pyguide.html) +- 保持 README.md 和用户指南的最新状态 + +## 提交 Pull Request + +1. 确保所有测试通过 +2. 更新相关文档 +3. 如果添加了新功能,请同时添加相应的测试 +4. 提交 PR 并在描述中详细说明变更内容 + +## 发布流程 + +1. 更新版本号(在 `__init__.py` 和 `pyproject.toml` 中) +2. 更新 CHANGELOG.md +3. 创建一个新的 git tag +4. 构建并上传到 PyPI: + +```bash +python -m build +twine upload dist/* +``` + +## 问题与支持 + +如有问题或需要支持,可以: +- 提交 GitHub Issue +- 在 PR 中讨论 +- 联系项目维护者 \ No newline at end of file diff --git a/seatunnel-mcp/docs/QUICK_START.md b/seatunnel-mcp/docs/QUICK_START.md new file mode 100644 index 0000000..e92e244 --- /dev/null +++ b/seatunnel-mcp/docs/QUICK_START.md @@ -0,0 +1,176 @@ +# SeaTunnel MCP 快速入门指南 + +本指南将帮助您快速开始使用 SeaTunnel MCP 服务器,并通过 Claude 等大型语言模型与 SeaTunnel 进行交互。 + +## 准备工作 + +确保您已安装以下软件: + +- Python 3.9 或更高版本 +- 运行中的 SeaTunnel 实例(或使用 Docker Compose) +- Claude Desktop(可选,如需与 Claude 集成) + +## 安装 + +### 方法 1:直接安装 + +```bash +# 克隆仓库 +git clone <仓库URL> +cd seatunnel-mcp + +# 创建虚拟环境并安装 +python -m venv .venv +source .venv/bin/activate # Windows: .venv\Scripts\activate +pip install -e . +``` + +### 方法 2:使用 Docker Compose + +```bash +# 克隆仓库 +git clone <仓库URL> +cd seatunnel-mcp + +# 启动 SeaTunnel 和 MCP 服务器 +docker-compose up -d +``` + +## 基本用法 + +### 1. 启动 MCP 服务器 + +```bash +# 使用命令行工具 +seatunnel-mcp run + +# 或直接运行模块 +python -m src.seatunnel_mcp +``` + +### 2. 使用 MCP Inspector 测试 + +MCP Inspector 是一个用于测试 MCP 服务器的工具。通过它,您可以直接与 MCP 服务器交互,无需 Claude 或其他 LLM。 + +```bash +# 安装 Node.js 和 npm 后 +npx @modelcontextprotocol/inspector python -m src.seatunnel_mcp +``` + +这将打开一个网页界面,您可以在其中测试各种 MCP 工具。 + +### 3. 与 Claude Desktop 集成 + +```bash +# 配置 Claude Desktop +seatunnel-mcp configure-claude +``` + +然后,重启 Claude Desktop,并在与 Claude 的对话中尝试使用 SeaTunnel。 + +## 常见使用场景示例 + +### 提交一个简单的作业 + +在 Claude 中,您可以这样与 SeaTunnel 交互: + +> 请帮我提交一个从 HDFS 到 Elasticsearch 的 SeaTunnel 作业,将 users 表中的数据导入到 users-index 索引中。 + +Claude 将帮助您创建配置并提交作业,例如: + +``` +我将帮您提交一个从 HDFS 到 Elasticsearch 的作业。 + +首先,让我检查当前的连接设置。 +[Claude 获取连接设置] + +现在,我将提交作业配置。以下是我准备的配置: + +env { + job.mode = "batch" +} + +source { + Hdfs { + path = "/data/users" + format = "json" + } +} + +sink { + Elasticsearch { + hosts = ["http://elasticsearch:9200"] + index = "users-index" + } +} + +[Claude 提交作业] + +作业已成功提交!作业 ID 是 123456。您可以稍后查询这个作业的状态。 +``` + +### 查看运行中的作业 + +> 请显示当前所有正在运行的 SeaTunnel 作业。 + +Claude 将使用 `get-running-jobs` 工具获取并展示所有运行中的作业。 + +### 停止一个作业 + +> 请停止作业 ID 为 123456 的作业。 + +Claude 将使用 `stop-job` 工具停止指定的作业。 + +## 高级用法 + +### 创建作业模板 + +您可以要求 Claude 记住某些常用的作业配置作为模板: + +> 请记住这个配置作为 "hdfs-to-es" 模板: +> ``` +> env { +> job.mode = "batch" +> } +> source { +> Hdfs { +> path = "/data/${path}" +> format = "${format}" +> } +> } +> sink { +> Elasticsearch { +> hosts = ["${es_host}"] +> index = "${index}" +> } +> } +> ``` + +然后,您可以这样使用模板: + +> 使用 "hdfs-to-es" 模板创建作业,参数是:path=users, format=json, es_host=http://elasticsearch:9200, index=users-index + +### 动态切换 SeaTunnel 实例 + +如果您需要连接到不同的 SeaTunnel 实例: + +> 请将连接切换到测试环境的 SeaTunnel,URL 是 http://test-seatunnel:8090 + +Claude 将使用 `update-connection-settings` 工具更新连接设置。 + +## 故障排除 + +如果您遇到问题: + +1. 确保 SeaTunnel 实例正在运行并可访问 +2. 检查 MCP 服务器的日志输出 +3. 使用 MCP Inspector 测试各个工具是否正常工作 +4. 检查环境变量和连接设置 + +详细的故障排除指南请参阅 [用户指南](USER_GUIDE.md) 的"故障排除"部分。 + +## 下一步 + +- 阅读 [用户指南](USER_GUIDE.md) 了解更多功能和选项 +- 查看 [示例文件夹](../examples/) 获取更多示例配置 +- 如果您想贡献代码,请参阅 [开发者指南](DEVELOPER_GUIDE.md) \ No newline at end of file diff --git a/seatunnel-mcp/docs/USER_GUIDE.md b/seatunnel-mcp/docs/USER_GUIDE.md new file mode 100644 index 0000000..1a00659 --- /dev/null +++ b/seatunnel-mcp/docs/USER_GUIDE.md @@ -0,0 +1,273 @@ +# SeaTunnel MCP User Guide + +This guide explains how to use the SeaTunnel Model Context Protocol (MCP) server with Claude and other LLM interfaces. + +## Overview + +The SeaTunnel MCP server provides a way for Large Language Models (LLMs) like Claude to interact with a SeaTunnel cluster. It enables you to: + +- Submit and manage SeaTunnel jobs +- Monitor job status and system health +- Configure and manage SeaTunnel settings + +## Setting Up + +1. Install the SeaTunnel MCP server: + ```bash + git clone + cd seatunnel-mcp + python -m venv .venv + source .venv/bin/activate # On Windows: .venv\Scripts\activate + pip install -e . + ``` + +2. Configure environment variables by copying `.env.example` to `.env` and modifying as needed: + ```bash + cp .env.example .env + # Edit .env to set your SeaTunnel API URL and other settings + ``` + +3. Start the MCP server: + ```bash + python -m src.seatunnel_mcp + ``` + +## Using with Claude Desktop + +To use the SeaTunnel MCP server with Claude Desktop: + +1. Add the following to your `claude_desktop_config.json`: + ```json + { + "mcpServers": { + "seatunnel": { + "command": "python", + "args": ["-m", "src.seatunnel_mcp"] + } + } + } + ``` + +2. Restart Claude Desktop for the changes to take effect. + +## Available Tools + +When interacting with Claude, you can use these tools through natural language: + +### Connection Management + +- **Get connection settings**: Ask Claude to show the current SeaTunnel connection settings. + + Example: "What are the current SeaTunnel connection settings?" + +- **Update connection settings**: Ask Claude to update the connection to a different SeaTunnel instance. + + Example: "Change the SeaTunnel connection to http://new-server:8090 with API key 'my_new_key'" + +### Job Management + +- **Submit a job**: Ask Claude to submit a new SeaTunnel job. + + Example: + ``` + Please submit this job to SeaTunnel: + + env { + job.mode = "batch" + } + + source { + Jdbc { + url = "jdbc:hive2://host:10000/default" + query = "select * from test limit 100" + } + } + + sink { + Elasticsearch { + hosts = ["http://elastic:9200"] + index = "my-index" + } + } + ``` + +- **Submit multiple jobs in batch**: Ask Claude to submit multiple jobs at once. Your input will be sent directly as the API request body. + + Example: + ``` + Please submit these jobs in batch: + + [ + { + "params": { + "jobId": "123456", + "jobName": "Job-1" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "plugin_output": "fake", + "row.num": 1000, + "schema": { + "fields": { + "name": "string", + "age": "int" + } + } + } + ], + "transform": [], + "sink": [ + { + "plugin_name": "Console", + "plugin_input": ["fake"] + } + ] + }, + { + "params": { + "jobId": "789012", + "jobName": "Job-2" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "plugin_output": "fake", + "row.num": 500, + "schema": { + "fields": { + "id": "int", + "value": "string" + } + } + } + ], + "transform": [], + "sink": [ + { + "plugin_name": "Console", + "plugin_input": ["fake"] + } + ] + } + ] + ``` + +- **Stop a job**: Ask Claude to stop a running job. + + Example: "Please stop job with ID 12345" + +- **Get job information**: Ask Claude to show information about a specific job. + + Example: "Show me details about job 12345" + +- **Get running jobs**: Ask Claude to list all running jobs. + + Example: "What jobs are currently running?" + +- **Get finished jobs**: Ask Claude to list finished jobs by state. + + Example: "Show me all completed jobs" or "List all failed jobs" + +### System Monitoring + +- **Get cluster overview**: Ask Claude to show an overview of the SeaTunnel cluster. + + Example: "Give me an overview of the SeaTunnel cluster" + +- **Get system monitoring information**: Ask Claude to show detailed system monitoring information. + + Example: "Show me the system monitoring information" + +## Example Conversations + +Here are some example conversations with Claude using the SeaTunnel MCP server: + +### Submitting a Job + +**You**: I need to transfer data from a Hive table to Elasticsearch. Can you help me submit a job for this? + +**Claude**: I can help you submit a job to transfer data from Hive to Elasticsearch. Let me walk you through creating and submitting a job configuration. + +First, let me get the current connection settings to make sure we're connected to the right SeaTunnel instance. + +[Claude gets connection settings] + +We're currently connected to [SeaTunnel URL]. Now, let's create and submit a job configuration. + +Here's a job I can submit for you that will transfer data from a Hive table to Elasticsearch. I'll need a few details: + +1. The Hive connection URL +2. Which table you want to query +3. The Elasticsearch host URL +4. The target index name + +Based on the information you've provided, I'll submit this job: + +[Claude submits job with proper configuration] + +The job has been submitted successfully! The job ID is [job ID]. You can monitor its status by asking me about this job's status. + +### Monitoring Jobs + +**You**: What jobs are currently running? + +**Claude**: Let me check the currently running jobs for you. + +[Claude gets running jobs] + +Here are the currently running jobs in your SeaTunnel cluster: + +[Claude displays job information in a formatted table] + +Would you like to see more details about any specific job? + +## Troubleshooting + +If you encounter issues: + +1. Check that the SeaTunnel API is accessible at the configured URL +2. Verify that any API keys are correct +3. Check the MCP server logs for error messages +4. Ensure the job configurations follow the correct format + +## Advanced Usage + +### Custom Connection Settings + +You can configure multiple SeaTunnel connections by asking Claude to update the connection settings: + +"Connect to our production SeaTunnel at https://prod-seatunnel:8090 with API key 'prod_key'" + +"Connect to our development SeaTunnel at https://dev-seatunnel:8090" + +### Job Configuration Templates + +You can ask Claude to save and reuse job templates: + +"Remember this job configuration as 'hive-to-elastic' template: +``` +env { + job.mode = "batch" +} +source { + Jdbc { + url = "jdbc:hive2://host:10000/default" + query = "select * from ${table} limit ${limit}" + } +} +sink { + Elasticsearch { + hosts = ["http://elastic:9200"] + index = "${index}" + } +} +```" + +Then later: "Submit a job using the 'hive-to-elastic' template with table='users', limit=1000, and index='users-index'" \ No newline at end of file diff --git a/seatunnel-mcp/docs/img/img.png b/seatunnel-mcp/docs/img/img.png new file mode 100644 index 0000000..dbc6a4c Binary files /dev/null and b/seatunnel-mcp/docs/img/img.png differ diff --git a/seatunnel-mcp/docs/img/seatunnel-mcp-logo.png b/seatunnel-mcp/docs/img/seatunnel-mcp-logo.png new file mode 100644 index 0000000..ee33653 Binary files /dev/null and b/seatunnel-mcp/docs/img/seatunnel-mcp-logo.png differ diff --git a/seatunnel-mcp/examples/simple_job.conf b/seatunnel-mcp/examples/simple_job.conf new file mode 100644 index 0000000..6372c37 --- /dev/null +++ b/seatunnel-mcp/examples/simple_job.conf @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +env { + job.mode = "batch" +} + +source { + Jdbc { + url = "jdbc:hive2://localhost:10000/default" + user = "hive" + password = "password" + driver = "org.apache.hive.jdbc.HiveDriver" + connection_check_timeout_sec = 100 + query = "select * from test_table limit 100" + } +} + +transform { + # No transformations in this simple example +} + +sink { + Elasticsearch { + hosts = ["http://localhost:9200"] + index = "test-index" + username = "" + password = "" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode = "APPEND" + } +} \ No newline at end of file diff --git a/seatunnel-mcp/pyproject.toml b/seatunnel-mcp/pyproject.toml new file mode 100644 index 0000000..6bd416f --- /dev/null +++ b/seatunnel-mcp/pyproject.toml @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "seatunnel-mcp" +version = "0.1.0" +description = "A Model Context Protocol (MCP) server for interacting with SeaTunnel through LLM interfaces" +readme = "README.md" +requires-python = ">=3.12" +license = {text = "Apache License"} +authors = [ + {name = "SeaTunnel MCP Team"} +] +dependencies = [ + "fastapi>=0.95.0", + "uvicorn>=0.21.1", + "pydantic>=2.0.0", + "httpx>=0.24.0", + "python-dotenv>=1.0.0", + "requests>=2.28.2", + "mcp>=0.1.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "black>=23.1.0", + "isort>=5.12.0", + "mypy>=1.0.1", + "flake8>=6.0.0", + "pytest-cov>=4.1.0", + "pytest-asyncio>=0.21.0", + "build>=1.0.3", + "twine>=4.0.2", + "pre-commit>=3.3.2", +] + +[project.scripts] +seatunnel-mcp = "src.seatunnel_mcp.cli:main" + +[tool.setuptools] +packages = ["src.seatunnel_mcp"] + +[tool.black] +line-length = 100 +target-version = ["py39"] + +[tool.isort] +profile = "black" +line_length = 100 + +[tool.mypy] +python_version = "3.12" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +disallow_untyped_decorators = false +no_implicit_optional = true +strict_optional = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = "test_*.py" +python_functions = "test_*" \ No newline at end of file diff --git a/seatunnel-mcp/requirements-dev.txt b/seatunnel-mcp/requirements-dev.txt new file mode 100644 index 0000000..9b32d35 --- /dev/null +++ b/seatunnel-mcp/requirements-dev.txt @@ -0,0 +1,6 @@ +pytest>=7.0.0 +black>=23.1.0 +isort>=5.12.0 +mypy>=1.0.1 +flake8>=6.0.0 +pytest-cov>=4.1.0 \ No newline at end of file diff --git a/seatunnel-mcp/run.bat b/seatunnel-mcp/run.bat new file mode 100644 index 0000000..72683f5 --- /dev/null +++ b/seatunnel-mcp/run.bat @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@echo off +REM Run the SeaTunnel MCP server +REM This script sets up the environment and runs the server + +REM Check if virtual environment exists, if not create it +if not exist .venv ( + echo Creating virtual environment... + python -m venv .venv +) + +REM Activate virtual environment +call .venv\Scripts\activate.bat + +REM Install dependencies if needed +pip show uvicorn >nul 2>&1 +if %ERRORLEVEL% neq 0 ( + echo Installing dependencies... + pip install -e . +) + +REM Check if .env file exists, if not create from example +if not exist .env ( + if exist .env.example ( + echo Creating .env file from .env.example... + copy .env.example .env + echo Please review and update the .env file with your SeaTunnel API settings. + ) else ( + echo Warning: .env.example file not found. Please create a .env file manually. + ) +) + +REM Run the server +echo Starting SeaTunnel MCP server... +python -m src.seatunnel_mcp + +REM Deactivate virtual environment when finished +call deactivate \ No newline at end of file diff --git a/seatunnel-mcp/run.sh b/seatunnel-mcp/run.sh new file mode 100755 index 0000000..20c3fa4 --- /dev/null +++ b/seatunnel-mcp/run.sh @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash + +# Run the SeaTunnel MCP server +# This script sets up the environment and runs the server + +# Check if virtual environment exists, if not create it +if [ ! -d ".venv" ]; then + echo "Creating virtual environment..." + python -m venv .venv +fi + +# Activate virtual environment +source .venv/bin/activate + +# Install dependencies if needed +if ! command -v uvicorn &> /dev/null; then + echo "Installing dependencies..." + pip install -e . +fi + +# Check if .env file exists, if not create from example +if [ ! -f ".env" ]; then + if [ -f ".env.example" ]; then + echo "Creating .env file from .env.example..." + cp .env.example .env + echo "Please review and update the .env file with your SeaTunnel API settings." + else + echo "Warning: .env.example file not found. Please create a .env file manually." + fi +fi + +# Run the server +echo "Starting SeaTunnel MCP server..." +python -m src.seatunnel_mcp \ No newline at end of file diff --git a/seatunnel-mcp/src/seatunnel_mcp/__init__.py b/seatunnel-mcp/src/seatunnel_mcp/__init__.py new file mode 100644 index 0000000..462f65e --- /dev/null +++ b/seatunnel-mcp/src/seatunnel_mcp/__init__.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""SeaTunnel MCP Server. + +A Model Context Protocol server for interacting with SeaTunnel through LLM interfaces. +""" + +__version__ = "0.1.0" \ No newline at end of file diff --git a/seatunnel-mcp/src/seatunnel_mcp/__main__.py b/seatunnel-mcp/src/seatunnel_mcp/__main__.py new file mode 100644 index 0000000..b5ac93d --- /dev/null +++ b/seatunnel-mcp/src/seatunnel_mcp/__main__.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Main entry point for the SeaTunnel MCP server.""" + +import os +import sys +import logging +from typing import Dict, Any, Optional + +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP + +from .client import SeaTunnelClient +from .tools import get_all_tools + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# Load environment variables +load_dotenv() + +# Default values +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 8080 +DEFAULT_API_URL = "http://localhost:8090" # Default SeaTunnel API URL + + +def main(): + """Run the SeaTunnel MCP server.""" + # Get configuration from environment + host = os.environ.get("MCP_HOST", DEFAULT_HOST) + port = int(os.environ.get("MCP_PORT", DEFAULT_PORT)) + api_url = os.environ.get("SEATUNNEL_API_URL", DEFAULT_API_URL) + api_key = os.environ.get("SEATUNNEL_API_KEY", None) + + # Create SeaTunnel client + client = SeaTunnelClient(base_url=api_url, api_key=api_key) + + # Create MCP server + server = FastMCP( + name="SeaTunnel MCP Server", + instructions="A Model Context Protocol server for interacting with SeaTunnel through LLM interfaces", + log_level="INFO", + host=host, + port=port, + ) + + # Register all tools + tools = get_all_tools(client) + for tool_fn in tools: + # 直接添加函数作为工具 + server.add_tool(tool_fn) + + # Run server + logger.info(f"Starting SeaTunnel MCP server at http://{host}:{port}") + server.run() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/seatunnel-mcp/src/seatunnel_mcp/cli.py b/seatunnel-mcp/src/seatunnel_mcp/cli.py new file mode 100644 index 0000000..246656d --- /dev/null +++ b/seatunnel-mcp/src/seatunnel_mcp/cli.py @@ -0,0 +1,206 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""SeaTunnel MCP 命令行界面。 + +为 SeaTunnel MCP 服务器提供命令行工具,便于启动、管理和配置服务器。 +""" + +import os +import sys +import argparse +import logging +import json +from typing import Optional, Dict, Any, List + +from dotenv import load_dotenv + +from . import __version__ +from .__main__ import main as run_server + + +def setup_logging(level: str) -> None: + """设置日志级别。 + + Args: + level: 日志级别 (debug, info, warning, error, critical) + """ + numeric_level = getattr(logging, level.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"无效的日志级别: {level}") + + logging.basicConfig( + level=numeric_level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + +def print_version() -> None: + """打印版本信息。""" + print(f"SeaTunnel MCP 版本: {__version__}") + + +def create_env_file(env_file: str) -> None: + """创建环境变量配置文件。 + + Args: + env_file: 环境变量文件路径 + """ + if os.path.exists(env_file): + print(f"错误: 文件已存在: {env_file}") + sys.exit(1) + + example_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), ".env.example") + + if os.path.exists(example_file): + with open(example_file, "r") as src, open(env_file, "w") as dst: + dst.write(src.read()) + print(f"已创建环境变量文件: {env_file}") + else: + # 如果找不到示例文件,创建一个基本的配置 + with open(env_file, "w") as f: + f.write("# SeaTunnel MCP 服务器配置\n\n") + f.write("# MCP 服务器配置\n") + f.write("MCP_HOST=127.0.0.1\n") + f.write("MCP_PORT=8080\n\n") + f.write("# SeaTunnel API 配置\n") + f.write("SEATUNNEL_API_URL=http://localhost:8090\n") + f.write("SEATUNNEL_API_KEY=your_api_key_here\n") + print(f"已创建基本环境变量文件: {env_file}") + + +def configure_mcp_for_claude_desktop(config_file: Optional[str] = None) -> None: + """为 Claude Desktop 配置 MCP 服务器。 + + Args: + config_file: Claude Desktop 配置文件路径,如果未提供则使用默认路径 + """ + if config_file is None: + # 尝试找到默认配置文件 + home_dir = os.path.expanduser("~") + default_paths = [ + os.path.join(home_dir, ".claude", "claude_desktop_config.json"), + os.path.join(home_dir, "AppData", "Roaming", "claude", "claude_desktop_config.json"), + os.path.join(home_dir, "Library", "Application Support", "claude", "claude_desktop_config.json"), + ] + + for path in default_paths: + if os.path.exists(path): + config_file = path + break + + if config_file is None: + print("错误: 无法找到 Claude Desktop 配置文件") + print("请手动指定配置文件路径: seatunnel-mcp configure-claude --config-file PATH") + sys.exit(1) + + # 读取现有配置或创建新配置 + config = {} + if os.path.exists(config_file): + try: + with open(config_file, "r") as f: + config = json.load(f) + except json.JSONDecodeError: + print(f"警告: {config_file} 包含无效的 JSON,将创建新配置") + + # 添加 SeaTunnel MCP 配置 + if "mcpServers" not in config: + config["mcpServers"] = {} + + config["mcpServers"]["seatunnel"] = { + "command": "python", + "args": ["-m", "src.seatunnel_mcp"] + } + + # 保存配置 + os.makedirs(os.path.dirname(config_file), exist_ok=True) + with open(config_file, "w") as f: + json.dump(config, f, indent=2) + + print(f"已为 Claude Desktop 配置 SeaTunnel MCP 服务器: {config_file}") + + +def main() -> None: + """命令行入口点。""" + parser = argparse.ArgumentParser(description="SeaTunnel MCP 服务器命令行工具") + + # 全局选项 + parser.add_argument("-v", "--version", action="store_true", help="显示版本信息") + parser.add_argument("--log-level", choices=["debug", "info", "warning", "error", "critical"], + default="info", help="设置日志级别 (默认: info)") + + # 子命令 + subparsers = parser.add_subparsers(dest="command", help="命令") + + # 运行服务器 + run_parser = subparsers.add_parser("run", help="运行 MCP 服务器") + run_parser.add_argument("--host", help="监听主机 (默认: 从环境变量获取)") + run_parser.add_argument("--port", type=int, help="监听端口 (默认: 从环境变量获取)") + run_parser.add_argument("--api-url", help="SeaTunnel API URL (默认: 从环境变量获取)") + run_parser.add_argument("--api-key", help="SeaTunnel API 密钥 (默认: 从环境变量获取)") + run_parser.add_argument("--env-file", help="环境变量文件路径 (默认: .env)") + + # 初始化环境变量文件 + init_parser = subparsers.add_parser("init", help="初始化环境变量文件") + init_parser.add_argument("--env-file", default=".env", help="环境变量文件路径 (默认: .env)") + + # 为 Claude Desktop 配置 MCP + claude_parser = subparsers.add_parser("configure-claude", help="为 Claude Desktop 配置 MCP 服务器") + claude_parser.add_argument("--config-file", help="Claude Desktop 配置文件路径") + + args = parser.parse_args() + + # 设置日志级别 + setup_logging(args.log_level) + + # 显示版本信息 + if args.version: + print_version() + return + + # 处理命令 + if args.command == "run": + # 加载环境变量 + if args.env_file: + load_dotenv(args.env_file) + else: + load_dotenv() + + # 设置环境变量 + if args.host: + os.environ["MCP_HOST"] = args.host + if args.port: + os.environ["MCP_PORT"] = str(args.port) + if args.api_url: + os.environ["SEATUNNEL_API_URL"] = args.api_url + if args.api_key: + os.environ["SEATUNNEL_API_KEY"] = args.api_key + + # 运行服务器 + run_server() + + elif args.command == "init": + create_env_file(args.env_file) + + elif args.command == "configure-claude": + configure_mcp_for_claude_desktop(args.config_file) + + else: + parser.print_help() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/seatunnel-mcp/src/seatunnel_mcp/client.py b/seatunnel-mcp/src/seatunnel_mcp/client.py new file mode 100644 index 0000000..8cd6336 --- /dev/null +++ b/seatunnel-mcp/src/seatunnel_mcp/client.py @@ -0,0 +1,313 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""SeaTunnel API client for interacting with the REST API.""" + +import json +import logging +from typing import Dict, List, Any, Optional, Union +import httpx + +logger = logging.getLogger(__name__) + + +class SeaTunnelClient: + """Client for interacting with the SeaTunnel REST API.""" + + def __init__(self, base_url: str, api_key: Optional[str] = None): + """Initialize the client. + + Args: + base_url: Base URL of the SeaTunnel REST API. + api_key: Optional API key for authentication. + """ + self.base_url = base_url + self.api_key = api_key + self.headers = {"Content-Type": "application/json"} + if api_key: + self.headers["Authorization"] = f"Bearer {api_key}" + + def update_connection_settings(self, url: Optional[str] = None, api_key: Optional[str] = None) -> Dict[str, Any]: + """Update connection settings. + + Args: + url: New base URL for the SeaTunnel REST API. + api_key: New API key for authentication. + + Returns: + Dict with updated connection settings. + """ + if url: + self.base_url = url + if api_key: + self.api_key = api_key + self.headers["Authorization"] = f"Bearer {api_key}" if api_key else None + + return self.get_connection_settings() + + def get_connection_settings(self) -> Dict[str, Any]: + """Get current connection settings. + + Returns: + Dict with current connection settings. + """ + return { + "url": self.base_url, + "has_api_key": self.api_key is not None, + } + + def _make_request(self, method: str, endpoint: str, **kwargs) -> httpx.Response: + """Make a request to the SeaTunnel API. + + Args: + method: HTTP method. + endpoint: API endpoint. + **kwargs: Additional arguments for the request. + + Returns: + Response from the API. + + Raises: + httpx.HTTPStatusError: If the request fails. + """ + url = f"{self.base_url}{endpoint}" + headers = kwargs.pop("headers", {}) + + # Don't add the default Content-Type header if we're uploading files + if "files" not in kwargs: + # Create a new dictionary with self.headers as the base + merged_headers = dict(self.headers) + # If custom headers exist, they override the default ones + merged_headers.update(headers) + headers = merged_headers + else: + # Only add the Authorization header when using files + if "Authorization" in self.headers: + headers["Authorization"] = self.headers["Authorization"] + + try: + with httpx.Client() as client: + response = client.request(method, url, headers=headers, **kwargs) + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error: {e}") + raise + except httpx.RequestError as e: + logger.error(f"Request error: {e}") + raise + + def submit_job( + self, + job_content: str, + jobName: Optional[str] = None, + jobId: Optional[str] = None, + isStartWithSavePoint: Optional[bool] = None, + format: str = "hocon" + ) -> Dict[str, Any]: + """Submit a new job. + + Args: + job_content: Job configuration content. + jobName: Optional job name. + jobId: Optional job ID. + isStartWithSavePoint: Whether to start with savepoint. + format: Job configuration format (hocon, json, yaml). + + Returns: + Response from the API. + """ + params = {} + if jobName: + params["jobName"] = jobName + if jobId is not None: + params["jobId"] = str(jobId) # Convert jobId to string + if isStartWithSavePoint is not None: + params["isStartWithSavePoint"] = str(isStartWithSavePoint).lower() + if format: + params["format"] = format + + response = self._make_request( + "POST", + "/submit-job", + params=params, + content=job_content, + headers={"Content-Type": "text/plain"} + ) + + return response.json() + + def submit_jobs( + self, + request_body: Any + ) -> Dict[str, Any]: + """Submit multiple jobs in batch. + + Args: + request_body: The direct request body to send to the API. + It will be used as-is without modification. + + Returns: + Response from the API. + """ + response = self._make_request( + "POST", + "/submit-jobs", + json=request_body, + ) + + return response.json() + + def submit_job_upload( + self, + config_file: Union[str, Any], + jobName: Optional[str] = None, + jobId: Optional[Union[str, int]] = None, + isStartWithSavePoint: Optional[bool] = None, + format: Optional[str] = None, + ) -> Dict[str, Any]: + """Submit a new job using file upload. + + Args: + config_file: Either a file path string or a file-like object. If a path string is provided, + the file will be opened and submitted in the multipart/form-data request body. + jobName: Optional job name (sent as a query parameter). + jobId: Optional job ID (sent as a query parameter). Can be a string or integer, will be converted to string. + isStartWithSavePoint: Whether to start with savepoint (sent as a query parameter). + format: Job configuration format (hocon, json, yaml) (sent as a query parameter). + If not provided, it will be determined from the file name. + + Returns: + Response from the API. + """ + params = {} + if jobName: + params["jobName"] = jobName + if jobId is not None: + params["jobId"] = str(jobId) # Convert jobId to string + if isStartWithSavePoint is not None: + params["isStartWithSavePoint"] = str(isStartWithSavePoint).lower() + + if format: + params["format"] = format + + # If config_file is a string, assume it's a file path and open the file + file_to_close = None + try: + if isinstance(config_file, str): + file_to_close = open(config_file, 'rb') + files = {'config_file': file_to_close} + else: + # Assume it's already a file-like object + files = {'config_file': config_file} + + response = self._make_request( + "POST", + "/submit-job/upload", + params=params, + files=files + ) + + return response.json() + finally: + # Ensure we close the file if we opened it + if file_to_close: + file_to_close.close() + + def stop_job(self, jobId: Union[str, int], isStartWithSavePoint: bool = False) -> Dict[str, Any]: + """Stop a running job. + + Args: + jobId: Job ID. + isStartWithSavePoint: Whether to stop with savepoint. + + Returns: + Response from the API. + """ + data = { + "jobId": jobId, + "isStopWithSavePoint": isStartWithSavePoint + } + + response = self._make_request("POST", "/stop-job", json=data) + return response.json() + + def get_job_info(self, jobId: Union[str, int]) -> Dict[str, Any]: + """Get information about a job. + + Args: + jobId: Job ID. + + Returns: + Response from the API. + """ + response = self._make_request("GET", f"/job-info/{jobId}") + return response.json() + + def get_running_job(self, jobId: Union[str, int]) -> Dict[str, Any]: + """Get information about a running job. + + Args: + jobId: Job ID. + + Returns: + Response from the API. + """ + response = self._make_request("GET", f"/running-job/{jobId}") + return response.json() + + def get_running_jobs(self) -> Dict[str, Any]: + """Get all running jobs. + + Returns: + Response from the API. + """ + response = self._make_request("GET", "/running-jobs") + return response.json() + + def get_finished_jobs(self, state: str) -> Dict[str, Any]: + """Get all finished jobs by state. + + Args: + state: Job state (FINISHED, CANCELED, FAILED, UNKNOWABLE). + + Returns: + Response from the API. + """ + response = self._make_request("GET", f"/finished-jobs/{state}") + return response.json() + + def get_overview(self, tags: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + """Get cluster overview. + + Args: + tags: Optional tags for filtering. + + Returns: + Response from the API. + """ + params = tags or {} + response = self._make_request("GET", "/overview", params=params) + return response.json() + + def get_system_monitoring_information(self) -> Dict[str, Any]: + """Get system monitoring information. + + Returns: + Response from the API. + """ + response = self._make_request("GET", "/system-monitoring-information") + return response.json() \ No newline at end of file diff --git a/seatunnel-mcp/src/seatunnel_mcp/schema.py b/seatunnel-mcp/src/seatunnel_mcp/schema.py new file mode 100644 index 0000000..1099efe --- /dev/null +++ b/seatunnel-mcp/src/seatunnel_mcp/schema.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""MCP schemas for the SeaTunnel MCP tools.""" + +from typing import Dict, List, Any, Optional, Union +from pydantic import BaseModel, Field + +__all__ = [ + "ConnectionSettings", + "SubmitJobRequest", + "SubmitJobUploadRequest", + "SubmitJobsRequest", + "StopJobRequest", + "JobStateType", +] + + +class ConnectionSettings(BaseModel): + """Connection settings for the SeaTunnel API.""" + + url: str = Field(..., description="Base URL of the SeaTunnel REST API") + has_api_key: bool = Field(..., description="Whether an API key is set") + + +class UpdateConnectionSettings(BaseModel): + """Update connection settings for the SeaTunnel API.""" + + url: Optional[str] = Field(None, description="New base URL for the SeaTunnel REST API") + api_key: Optional[str] = Field(None, description="New API key for authentication") + + +class SubmitJobRequest(BaseModel): + """Request for submitting a job.""" + + job_content: str = Field(..., description="Job configuration content in specified format") + jobName: Optional[str] = Field(None, description="Optional job name") + jobId: Optional[Union[str, int]] = Field(None, description="Optional job ID. Can be a string or integer.") + is_start_with_save_point: Optional[bool] = Field(None, description="Whether to start with savepoint") + format: str = Field("hocon", description="Job configuration format (hocon, json, yaml)") + + +class SubmitJobUploadRequest(BaseModel): + """Request for submitting a job via file upload.""" + + config_file: Union[str, Any] = Field(..., description="Configuration file path or file object (multipart/form-data body parameter)") + jobName: Optional[str] = Field(None, description="Optional job name (query parameter)") + jobId: Optional[Union[str, int]] = Field(None, description="Optional job ID (query parameter). Can be a string or integer.") + is_start_with_save_point: Optional[bool] = Field(None, description="Whether to start with savepoint (query parameter)") + format: Optional[str] = Field(None, description="Job configuration format (hocon, json, yaml) (query parameter). If not provided, determined from the file name") + + +class SubmitJobsRequest(BaseModel): + """Request for submitting multiple jobs in batch.""" + + request_body: Any = Field(..., description="Direct request body to send to the API") + + +class StopJobRequest(BaseModel): + """Request for stopping a job.""" + + jobId: Union[str, int] = Field(..., description="Job ID") + isStartWithSavePoint: bool = Field(False, description="Whether to stop with savepoint") + + +class JobInfoRequest(BaseModel): + """Request for getting job information.""" + + jobId: Union[str, int] = Field(..., description="Job ID") + + +class FinishedJobsRequest(BaseModel): + """Request for getting finished jobs.""" + + state: str = Field(..., description="Job state (FINISHED, CANCELED, FAILED, UNKNOWABLE)") + + +class OverviewRequest(BaseModel): + """Request for getting cluster overview.""" + + tags: Optional[Dict[str, str]] = Field(None, description="Optional tags for filtering") \ No newline at end of file diff --git a/seatunnel-mcp/src/seatunnel_mcp/tools.py b/seatunnel-mcp/src/seatunnel_mcp/tools.py new file mode 100644 index 0000000..a6b37b2 --- /dev/null +++ b/seatunnel-mcp/src/seatunnel_mcp/tools.py @@ -0,0 +1,404 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""MCP tools for interacting with the SeaTunnel REST API.""" + +import json +import logging +from typing import Dict, List, Any, Optional, Union, Callable, Iterable +from functools import wraps + +from mcp.server.fastmcp.tools import Tool +from mcp.types import TextContent, ImageContent, EmbeddedResource + +from .client import SeaTunnelClient + +logger = logging.getLogger(__name__) + + +def get_connection_settings_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for retrieving connection settings. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def get_connection_settings() -> Dict[str, Any]: + """Get current connection settings.""" + result = client.get_connection_settings() + return result + + get_connection_settings.__name__ = "get-connection-settings" + get_connection_settings.__doc__ = "Get current SeaTunnel connection URL and API key status" + + return get_connection_settings + + +def update_connection_settings_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for updating connection settings. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def update_connection_settings(url: Optional[str] = None, api_key: Optional[str] = None) -> Dict[str, Any]: + """Update connection settings. + + Args: + url: New base URL for the SeaTunnel REST API. + api_key: New API key for authentication. + + Returns: + Updated connection settings. + """ + result = client.update_connection_settings(url=url, api_key=api_key) + return result + + update_connection_settings.__name__ = "update-connection-settings" + update_connection_settings.__doc__ = "Update URL and/or API key to connect to a different SeaTunnel instance" + + return update_connection_settings + + +def submit_job_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for submitting a job. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def submit_job( + job_content: str, + jobName: Optional[str] = None, + jobId: Optional[Union[str, int]] = None, + isStartWithSavePoint: Optional[bool] = None, + format: str = "hocon", + ) -> Dict[str, Any]: + """Submit a new job. + + Args: + job_content: Job configuration content. + jobName: Optional job name. + jobId: Optional job ID. Can be a string or integer, will be converted to string. + isStartWithSavePoint: Whether to start with savepoint. + format: Job configuration format (hocon, json, yaml). + + Returns: + Response from the API. + """ + result = client.submit_job( + job_content=job_content, + jobName=jobName, + jobId=jobId, + isStartWithSavePoint=isStartWithSavePoint, + format=format, + ) + return result + + submit_job.__name__ = "submit-job" + submit_job.__doc__ = "Submit a new job to the SeaTunnel cluster with configuration content" + + return submit_job + + +def submit_job_upload_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for submitting a job using file upload. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def submit_job_upload( + config_file: Union[str, Any], + jobName: Optional[str] = None, + jobId: Optional[Union[str, int]] = None, + isStartWithSavePoint: Optional[bool] = None, + format: Optional[str] = None, + ) -> Dict[str, Any]: + """Submit a new job using file upload. + + Args: + config_file: Either a file path string or a file-like object. If a path string is provided, + the file will be opened and submitted in the multipart/form-data request body. + jobName: Optional job name (sent as a query parameter). + jobId: Optional job ID (sent as a query parameter). Can be a string or integer, will be converted to string. + isStartWithSavePoint: Whether to start with savepoint (sent as a query parameter). + format: Job configuration format (hocon, json, yaml) (sent as a query parameter). + If not provided, it will be determined from the file name. + + Returns: + Response from the API. + """ + result = client.submit_job_upload( + config_file=config_file, + jobName=jobName, + jobId=jobId, + isStartWithSavePoint=isStartWithSavePoint, + format=format, + ) + return result + + submit_job_upload.__name__ = "submit-job-upload" + submit_job_upload.__doc__ = "Submit a new job to the SeaTunnel cluster by uploading a configuration file" + + return submit_job_upload + + +def submit_jobs_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for submitting multiple jobs in batch. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def submit_jobs( + request_body: Any + ) -> Dict[str, Any]: + """Submit multiple jobs in batch. + + Args: + request_body: The direct request body to send to the API. + It will be used as-is without modification. + + Returns: + Response from the API. + """ + result = client.submit_jobs(request_body=request_body) + return result + + submit_jobs.__name__ = "submit-jobs" + submit_jobs.__doc__ = "Submit multiple jobs in batch to the SeaTunnel cluster. The input will be sent directly as the request body." + + return submit_jobs + + +def stop_job_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for stopping a running job. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def stop_job(jobId: Union[str, int], isStartWithSavePoint: bool = False) -> Dict[str, Any]: + """Stop a running job. + + Args: + jobId: Job ID. Can be a string or integer. + isStartWithSavePoint: Whether to stop with savepoint. + + Returns: + Response from the API. + """ + result = client.stop_job(jobId=jobId, isStartWithSavePoint=isStartWithSavePoint) + return result + + stop_job.__name__ = "stop-job" + stop_job.__doc__ = "Stop a running job by providing the jobId and optional isStartWithSavePoint flag" + + return stop_job + + +def get_job_info_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for retrieving job information. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def get_job_info(jobId: Union[str, int]) -> Dict[str, Any]: + """Get information about a job. + + Args: + jobId: Job ID (used as path parameter in /job-info/{jobId}). Can be a string or integer. + + Returns: + Response from the API. + """ + result = client.get_job_info(jobId=jobId) + return result + + get_job_info.__name__ = "get-job-info" + get_job_info.__doc__ = "Get detailed information about a specific job by providing the jobId as a path parameter" + + return get_job_info + + +def get_running_job_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for retrieving information about a running job. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def get_running_job(jobId: Union[str, int]) -> Dict[str, Any]: + """Get information about a running job. + + Args: + jobId: Job ID (used as path parameter in /running-job/{jobId}). Can be a string or integer. + + Returns: + Response from the API. + """ + result = client.get_running_job(jobId=jobId) + return result + + get_running_job.__name__ = "get-running-job" + get_running_job.__doc__ = "Get details about a specific running job by providing the jobId as a path parameter" + + return get_running_job + + +def get_running_jobs_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for retrieving all running jobs. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def get_running_jobs() -> Dict[str, Any]: + """Get all running jobs. + + Returns: + Response from the API. + """ + result = client.get_running_jobs() + return result + + get_running_jobs.__name__ = "get-running-jobs" + get_running_jobs.__doc__ = "List all currently running jobs" + + return get_running_jobs + + +def get_finished_jobs_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for retrieving all finished jobs by state. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def get_finished_jobs(state: str) -> Dict[str, Any]: + """Get all finished jobs by state. + + Args: + state: Job state (FINISHED, CANCELED, FAILED, UNKNOWABLE) (used as path parameter in /finished-jobs/{state}). + + Returns: + Response from the API. + """ + result = client.get_finished_jobs(state=state) + return result + + get_finished_jobs.__name__ = "get-finished-jobs" + get_finished_jobs.__doc__ = "List all finished jobs by state (FINISHED, CANCELED, FAILED, UNKNOWABLE) using the state as a path parameter" + + return get_finished_jobs + + +def get_overview_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for retrieving cluster overview. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def get_overview(tags: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + """Get cluster overview. + + Args: + tags: Optional tags for filtering. + + Returns: + Response from the API. + """ + result = client.get_overview(tags=tags) + return result + + get_overview.__name__ = "get-overview" + get_overview.__doc__ = "Get an overview of the SeaTunnel cluster" + + return get_overview + + +def get_system_monitoring_information_tool(client: SeaTunnelClient) -> Callable: + """Get a tool for retrieving system monitoring information. + + Args: + client: SeaTunnel client instance. + + Returns: + Function that can be registered as a tool. + """ + async def get_system_monitoring_information() -> Dict[str, Any]: + """Get system monitoring information. + + Returns: + Response from the API. + """ + result = client.get_system_monitoring_information() + return result + + get_system_monitoring_information.__name__ = "get-system-monitoring-information" + get_system_monitoring_information.__doc__ = "Get detailed system monitoring information" + + return get_system_monitoring_information + + +def get_all_tools(client: SeaTunnelClient) -> List[Callable]: + """Get all MCP tools. + + Args: + client: SeaTunnelClient instance. + + Returns: + List of all tool functions. + """ + return [ + get_connection_settings_tool(client), + update_connection_settings_tool(client), + submit_job_tool(client), + submit_job_upload_tool(client), + submit_jobs_tool(client), + stop_job_tool(client), + get_job_info_tool(client), + get_running_job_tool(client), + get_running_jobs_tool(client), + get_finished_jobs_tool(client), + get_overview_tool(client), + get_system_monitoring_information_tool(client), + ] \ No newline at end of file diff --git a/seatunnel-mcp/tests/__init__.py b/seatunnel-mcp/tests/__init__.py new file mode 100644 index 0000000..c7de8e2 --- /dev/null +++ b/seatunnel-mcp/tests/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Test package for SeaTunnel MCP.""" \ No newline at end of file diff --git a/seatunnel-mcp/tests/integration/__init__.py b/seatunnel-mcp/tests/integration/__init__.py new file mode 100644 index 0000000..37ca2f5 --- /dev/null +++ b/seatunnel-mcp/tests/integration/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""集成测试包。""" \ No newline at end of file diff --git a/seatunnel-mcp/tests/integration/test_api_client.py b/seatunnel-mcp/tests/integration/test_api_client.py new file mode 100644 index 0000000..967406e --- /dev/null +++ b/seatunnel-mcp/tests/integration/test_api_client.py @@ -0,0 +1,239 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""SeaTunnel API 客户端集成测试。 + +注意:这些测试需要一个运行中的 SeaTunnel 实例。 +您可以使用 Docker 启动一个测试实例: + +```bash +docker run -d --name seatunnel -p 8090:8090 apache/seatunnel:latest +``` + +或者,您可以在测试中使用模拟服务器。 +""" + +import os +import pytest +import httpx +from unittest.mock import patch + +from src.seatunnel_mcp.client import SeaTunnelClient + + +# 跳过集成测试,除非明确启用 +pytestmark = pytest.mark.skipif( + os.environ.get("ENABLE_INTEGRATION_TESTS") != "1", + reason="需要设置 ENABLE_INTEGRATION_TESTS=1 环境变量来运行集成测试" +) + +# SeaTunnel API URL,可以通过环境变量覆盖 +API_URL = os.environ.get("SEATUNNEL_API_URL", "http://localhost:8090") +API_KEY = os.environ.get("SEATUNNEL_API_KEY", None) + + +@pytest.fixture +def client(): + """创建一个 SeaTunnel 客户端实例用于测试。""" + return SeaTunnelClient(base_url=API_URL, api_key=API_KEY) + + +def test_connection(client): + """测试与 SeaTunnel API 的连接。""" + try: + # 获取概览信息(这是一个简单的端点,通常可用) + response = client.get_overview() + assert isinstance(response, dict) + except httpx.RequestError as e: + pytest.skip(f"无法连接到 SeaTunnel API: {e}") + except httpx.HTTPStatusError as e: + pytest.skip(f"SeaTunnel API 响应错误: {e}") + + +def test_get_running_jobs(client): + """测试获取运行中的作业。""" + try: + response = client.get_running_jobs() + assert isinstance(response, dict) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + pytest.skip("get_running_jobs 端点不可用") + raise + + +def test_get_finished_jobs(client): + """测试获取已完成的作业。""" + try: + response = client.get_finished_jobs(state="FINISHED") + assert isinstance(response, dict) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + pytest.skip("get_finished_jobs 端点不可用") + raise + + +def test_get_system_monitoring_information(client): + """测试获取系统监控信息。""" + try: + response = client.get_system_monitoring_information() + assert isinstance(response, dict) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + pytest.skip("get_system_monitoring_information 端点不可用") + raise + + +def test_submit_and_stop_job(client): + """测试提交和停止作业。 + + 注意:这个测试会提交一个真实的作业,可能会消耗资源。 + """ + # 定义一个简单的测试作业配置 + job_config = """ + env { + job.mode = "batch" + } + + source { + FakeSource { + row.num = 10 + schema = { + fields { + id = int + name = string + } + } + } + } + + sink { + Console {} + } + """ + + try: + # 提交作业 + submit_response = client.submit_job( + job_content=job_config, + jobName="integration_test_job", + format="hocon" + ) + assert isinstance(submit_response, dict) + + # 检查是否获取到作业 ID + assert "jobId" in submit_response or "jobId" in submit_response + + # 获取作业 ID + jobId = submit_response.get("jobId", submit_response.get("jobId")) + + # 尝试停止作业(可能已经完成) + try: + stop_response = client.stop_job(jobId=jobId) + assert isinstance(stop_response, dict) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + # 作业可能已经完成,这是正常的 + pass + elif e.response.status_code == 400: + # 作业可能已经停止,这也是正常的 + pass + else: + raise + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + pytest.skip("submit_job 或 stop_job 端点不可用") + raise + + +def test_submit_jobs(client): + """测试批量提交作业。 + + 注意:这个测试会提交真实的作业,可能会消耗资源。 + """ + # 定义两个简单的测试作业作为请求体 + request_body = [ + { + "params": { + "jobId": "batch_test_1", + "jobName": "batch_test_job_1" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "plugin_output": "fake", + "row.num": 10, + "schema": { + "fields": { + "id": "int", + "name": "string" + } + } + } + ], + "transform": [], + "sink": [ + { + "plugin_name": "Console", + "plugin_input": ["fake"] + } + ] + }, + { + "params": { + "jobId": "batch_test_2", + "jobName": "batch_test_job_2" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "plugin_output": "fake", + "row.num": 10, + "schema": { + "fields": { + "id": "int", + "name": "string" + } + } + } + ], + "transform": [], + "sink": [ + { + "plugin_name": "Console", + "plugin_input": ["fake"] + } + ] + } + ] + + try: + # 批量提交作业 + submit_response = client.submit_jobs(request_body=request_body) + assert isinstance(submit_response, dict) + + # 验证响应包含作业ID + assert "jobIds" in submit_response or "jobs" in submit_response + + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + pytest.skip("submit_jobs 端点不可用") + raise \ No newline at end of file diff --git a/seatunnel-mcp/tests/test_client.py b/seatunnel-mcp/tests/test_client.py new file mode 100644 index 0000000..c4fbe09 --- /dev/null +++ b/seatunnel-mcp/tests/test_client.py @@ -0,0 +1,249 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for the SeaTunnel client.""" + +import pytest +import httpx +from unittest.mock import patch, MagicMock + +from src.seatunnel_mcp.client import SeaTunnelClient + + +@pytest.fixture +def client(): + """Create a client for testing.""" + return SeaTunnelClient(base_url="http://localhost:8090", api_key="test_key") + + +def test_init(client): + """Test client initialization.""" + assert client.base_url == "http://localhost:8090" + assert client.api_key == "test_key" + assert client.headers == { + "Content-Type": "application/json", + "Authorization": "Bearer test_key", + } + + +def test_get_connection_settings(client): + """Test get_connection_settings.""" + settings = client.get_connection_settings() + assert settings == { + "url": "http://localhost:8090", + "has_api_key": True, + } + + +def test_update_connection_settings(client): + """Test update_connection_settings.""" + settings = client.update_connection_settings( + url="http://new-host:8090", + api_key="new_key", + ) + assert client.base_url == "http://new-host:8090" + assert client.api_key == "new_key" + assert client.headers["Authorization"] == "Bearer new_key" + assert settings == { + "url": "http://new-host:8090", + "has_api_key": True, + } + + +@patch("httpx.Client") +def test_submit_job(mock_client, client): + """Test submit_job.""" + mock_response = MagicMock() + mock_response.json.return_value = {"jobId": "123"} + mock_response.raise_for_status.return_value = None + + mock_client_instance = MagicMock() + mock_client_instance.request.return_value = mock_response + mock_client.return_value.__enter__.return_value = mock_client_instance + + job_content = "env { job.mode = \"batch\" }" + result = client.submit_job( + job_content=job_content, + jobName="test_job", + format="hocon", + ) + + mock_client_instance.request.assert_called_once_with( + "POST", + "http://localhost:8090/submit-job", + headers={ + "Content-Type": "text/plain", + "Authorization": "Bearer test_key", + }, + params={"jobName": "test_job", "format": "hocon"}, + content=job_content, + ) + + assert result == {"jobId": "123"} + + +@patch("httpx.Client") +def test_submit_jobs(mock_client, client): + """Test submit_jobs.""" + mock_response = MagicMock() + mock_response.json.return_value = {"jobIds": ["123", "456"]} + mock_response.raise_for_status.return_value = None + + mock_client_instance = MagicMock() + mock_client_instance.request.return_value = mock_response + mock_client.return_value.__enter__.return_value = mock_client_instance + + # 直接作为请求体的任意数据 + request_body = [ + { + "params": {"jobId": "123", "jobName": "job-1"}, + "env": {"job.mode": "batch"}, + "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}], + "transform": [], + "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}] + }, + { + "params": {"jobId": "456", "jobName": "job-2"}, + "env": {"job.mode": "batch"}, + "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}], + "transform": [], + "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}] + } + ] + + result = client.submit_jobs(request_body=request_body) + + mock_client_instance.request.assert_called_once_with( + "POST", + "http://localhost:8090/submit-jobs", + headers={ + "Content-Type": "application/json", + "Authorization": "Bearer test_key", + }, + json=request_body, + ) + + assert result == {"jobIds": ["123", "456"]} + + +@patch("httpx.Client") +def test_submit_job_upload(mock_client, client): + """Test submit_job_upload.""" + mock_response = MagicMock() + mock_response.json.return_value = {"jobId": "123"} + mock_response.raise_for_status.return_value = None + + mock_client_instance = MagicMock() + mock_client_instance.request.return_value = mock_response + mock_client.return_value.__enter__.return_value = mock_client_instance + + # Mock file-like object + config_file = MagicMock() + config_file.name = "test_job.conf" + + result = client.submit_job_upload( + config_file=config_file, + jobName="test_job", + ) + + mock_client_instance.request.assert_called_once_with( + "POST", + "http://localhost:8090/submit-job/upload", + headers={ + "Authorization": "Bearer test_key", + }, + params={"jobName": "test_job"}, + files={'config_file': config_file}, + ) + + assert result == {"jobId": "123"} + + +@patch("httpx.Client") +def test_submit_job_upload_json(mock_client, client): + """Test submit_job_upload with JSON file.""" + mock_response = MagicMock() + mock_response.json.return_value = {"jobId": "123"} + mock_response.raise_for_status.return_value = None + + mock_client_instance = MagicMock() + mock_client_instance.request.return_value = mock_response + mock_client.return_value.__enter__.return_value = mock_client_instance + + # Mock file-like object with json extension + config_file = MagicMock() + config_file.name = "test_job.json" + + result = client.submit_job_upload( + config_file=config_file, + jobName="test_job", + format="json", # Explicitly specify format + ) + + mock_client_instance.request.assert_called_once_with( + "POST", + "http://localhost:8090/submit-job/upload", + headers={ + "Authorization": "Bearer test_key", + }, + params={"jobName": "test_job", "format": "json"}, + files={'config_file': config_file}, + ) + + assert result == {"jobId": "123"} + + +@patch("httpx.Client") +@patch("builtins.open") +def test_submit_job_upload_path(mock_open, mock_client, client): + """Test submit_job_upload with a file path.""" + mock_response = MagicMock() + mock_response.json.return_value = {"jobId": "123"} + mock_response.raise_for_status.return_value = None + + mock_client_instance = MagicMock() + mock_client_instance.request.return_value = mock_response + mock_client.return_value.__enter__.return_value = mock_client_instance + + # Mock the file object returned by open() + mock_file = MagicMock() + mock_open.return_value = mock_file + + file_path = "/path/to/test_job.conf" + result = client.submit_job_upload( + config_file=file_path, + jobName="test_job", + jobId="987654321", + ) + + # Check that open was called with the file path + mock_open.assert_called_once_with(file_path, 'rb') + + # Check that the request was made with the proper parameters + mock_client_instance.request.assert_called_once_with( + "POST", + "http://localhost:8090/submit-job/upload", + headers={ + "Authorization": "Bearer test_key", + }, + params={"jobName": "test_job", "jobId": "987654321"}, + files={'config_file': mock_file}, + ) + + # Verify the mock file was closed after the request + mock_file.close.assert_called_once() + + assert result == {"jobId": "123"} \ No newline at end of file diff --git a/seatunnel-mcp/tests/test_tools.py b/seatunnel-mcp/tests/test_tools.py new file mode 100644 index 0000000..9801cdb --- /dev/null +++ b/seatunnel-mcp/tests/test_tools.py @@ -0,0 +1,223 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for the SeaTunnel MCP tools.""" + +import pytest +from unittest.mock import MagicMock + +from src.seatunnel_mcp.client import SeaTunnelClient +from src.seatunnel_mcp.tools import ( + get_connection_settings_tool, + update_connection_settings_tool, + submit_job_tool, + submit_job_upload_tool, + submit_jobs_tool, + stop_job_tool, + get_job_info_tool, + get_running_job_tool, + get_running_jobs_tool, + get_finished_jobs_tool, + get_overview_tool, + get_system_monitoring_information_tool, + get_all_tools, +) + + +@pytest.fixture +def mock_client(): + """Create a mock client for testing.""" + client = MagicMock(spec=SeaTunnelClient) + client.get_connection_settings.return_value = { + "url": "http://localhost:8090", + "has_api_key": True, + } + client.update_connection_settings.return_value = { + "url": "http://new-host:8090", + "has_api_key": True, + } + client.submit_job.return_value = {"jobId": "123"} + client.submit_job_upload.return_value = {"jobId": "123"} + client.submit_jobs.return_value = {"jobIds": ["123", "456"]} + client.stop_job.return_value = {"status": "success"} + client.get_job_info.return_value = {"jobId": "123", "status": "RUNNING"} + client.get_running_job.return_value = {"jobId": "123", "status": "RUNNING"} + client.get_running_jobs.return_value = {"jobs": [{"jobId": "123", "status": "RUNNING"}]} + client.get_finished_jobs.return_value = {"jobs": [{"jobId": "456", "status": "FINISHED"}]} + client.get_overview.return_value = {"cluster": "info"} + client.get_system_monitoring_information.return_value = {"system": "info"} + return client + + +@pytest.mark.asyncio +async def test_get_connection_settings_tool(mock_client): + """Test get_connection_settings_tool.""" + tool = get_connection_settings_tool(mock_client) + assert tool.name == "get-connection-settings" + result = await tool.fn() + mock_client.get_connection_settings.assert_called_once() + assert result == { + "url": "http://localhost:8090", + "has_api_key": True, + } + + +@pytest.mark.asyncio +async def test_update_connection_settings_tool(mock_client): + """Test update_connection_settings_tool.""" + tool = update_connection_settings_tool(mock_client) + assert tool.name == "update-connection-settings" + result = await tool.fn(url="http://new-host:8090", api_key="new_key") + mock_client.update_connection_settings.assert_called_once_with( + url="http://new-host:8090", api_key="new_key" + ) + assert result == { + "url": "http://new-host:8090", + "has_api_key": True, + } + + +@pytest.mark.asyncio +async def test_submit_job_tool(mock_client): + """Test submit_job_tool.""" + tool = submit_job_tool(mock_client) + assert tool.name == "submit-job" + job_content = "env { job.mode = \"batch\" }" + result = await tool.fn( + job_content=job_content, + jobName="test_job", + format="hocon", + ) + mock_client.submit_job.assert_called_once_with( + job_content=job_content, + jobName="test_job", + jobId=None, + isStartWithSavePoint=None, + format="hocon", + ) + assert result == {"jobId": "123"} + + +@pytest.mark.asyncio +async def test_submit_job_upload_tool(mock_client): + """Test submit_job_upload_tool.""" + tool = submit_job_upload_tool(mock_client) + assert tool.name == "submit-job-upload" + + # Mock file-like object + config_file = MagicMock() + config_file.name = "test_job.conf" + + result = await tool.fn( + config_file=config_file, + jobName="test_job", + ) + mock_client.submit_job_upload.assert_called_once_with( + config_file=config_file, + jobName="test_job", + jobId=None, + isStartWithSavePoint=None, + format=None, + ) + assert result == {"jobId": "123"} + + +@pytest.mark.asyncio +async def test_submit_job_upload_tool_path(mock_client): + """Test submit_job_upload_tool with a file path.""" + tool = submit_job_upload_tool(mock_client) + assert tool.name == "submit-job-upload" + + file_path = "/path/to/config.conf" + result = await tool.fn( + config_file=file_path, + jobName="test_job", + jobId="987654321", + ) + mock_client.submit_job_upload.assert_called_once_with( + config_file=file_path, + jobName="test_job", + jobId="987654321", + isStartWithSavePoint=None, + format=None, + ) + assert result == {"jobId": "123"} + + +@pytest.mark.asyncio +async def test_submit_jobs_tool(mock_client): + """Test submit_jobs_tool.""" + # Set up return value for submit_jobs + mock_client.submit_jobs.return_value = {"jobIds": ["123", "456"]} + + # Create the tool + tool = submit_jobs_tool(mock_client) + assert tool.name == "submit-jobs" + + # 直接作为请求体的任意数据 + request_body = [ + { + "params": {"jobId": "123", "jobName": "job-1"}, + "env": {"job.mode": "batch"}, + "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}], + "transform": [], + "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}] + }, + { + "params": {"jobId": "456", "jobName": "job-2"}, + "env": {"job.mode": "batch"}, + "source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}], + "transform": [], + "sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}] + } + ] + + # Call the tool + result = await tool.fn(request_body=request_body) + + # Verify the client method was called correctly + mock_client.submit_jobs.assert_called_once_with(request_body=request_body) + + # Check the result + assert result == {"jobIds": ["123", "456"]} + + +@pytest.mark.asyncio +async def test_stop_job_tool(mock_client): + """Test stop_job_tool.""" + tool = stop_job_tool(mock_client) + assert tool.name == "stop-job" + result = await tool.fn(jobId="123") + mock_client.stop_job.assert_called_once_with(jobId="123") + assert result == {"status": "success"} + + +def test_get_all_tools(mock_client): + """Test get_all_tools.""" + tools = get_all_tools(mock_client) + assert len(tools) == 12 + tool_names = [tool.__name__ for tool in tools] + assert "get-connection-settings" in tool_names + assert "update-connection-settings" in tool_names + assert "submit-job" in tool_names + assert "submit-jobs" in tool_names + assert "stop-job" in tool_names + assert "get-job-info" in tool_names + assert "get-running-job" in tool_names + assert "get-running-jobs" in tool_names + assert "get-finished-jobs" in tool_names + assert "get-overview" in tool_names + assert "get-system-monitoring-information" in tool_names \ No newline at end of file