From 3509efd4528ef7e4e01b25466fd30d9b704e5a98 Mon Sep 17 00:00:00 2001 From: Changkun Ou Date: Mon, 22 Dec 2025 16:33:37 +0100 Subject: [PATCH] feat: add mcp resources --- go.mod | 5 +- go.sum | 6 ++- main.go | 23 ++++++++- resources.go | 126 +++++++++++++++++++++++++++++++++++++++++++++++ tools.go | 136 +++++++++++++++++++++++++++++++++++++++++++-------- 5 files changed, 269 insertions(+), 27 deletions(-) create mode 100644 resources.go diff --git a/go.mod b/go.mod index 43e034f..e17908e 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,13 @@ module github.com/sixt/tensorlake-mcp -go 1.25 +go 1.25.1 require ( + github.com/go4org/hashtriemap v0.0.0-20251130024219-545ba229f689 github.com/google/jsonschema-go v0.4.2 github.com/google/uuid v1.6.0 github.com/modelcontextprotocol/go-sdk v1.2.0-pre.2 - github.com/sixt/tensorlake-go v0.1.0 + github.com/sixt/tensorlake-go v0.1.1 ) require ( diff --git a/go.sum b/go.sum index acfe076..70d8616 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/go4org/hashtriemap v0.0.0-20251130024219-545ba229f689 h1:0psnKZ+N2IP43/SZC8SKx6OpFJwLmQb9m9QyV9BC2f8= +github.com/go4org/hashtriemap v0.0.0-20251130024219-545ba229f689/go.mod h1:OGmRfY/9QEK2P5zCRtmqfbCF283xPkU2dvVA4MvbvpI= github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -8,8 +10,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/modelcontextprotocol/go-sdk v1.2.0-pre.2 h1:U2JzHO7jPPKyo1Uu34xuR0QOgeLrWmsIG6orPJE//Bc= github.com/modelcontextprotocol/go-sdk v1.2.0-pre.2/go.mod h1:6fM3LCm3yV7pAs8isnKLn07oKtB0MP9LHd3DfAcKw10= -github.com/sixt/tensorlake-go v0.1.0 h1:k0Xo45Qy789KvUEHSD8vKRwytgfL4zvmAsizkW47ZF8= -github.com/sixt/tensorlake-go v0.1.0/go.mod h1:qPF3a5Z2gAPdz4SoKE4NRnb72+c5D3/c9F21+BKvbFc= +github.com/sixt/tensorlake-go v0.1.1 h1:LA6UzOk5Iv2fKYqqzgAXOK0zHBwXjxTTkFIbXjmBxLw= +github.com/sixt/tensorlake-go v0.1.1/go.mod h1:qPF3a5Z2gAPdz4SoKE4NRnb72+c5D3/c9F21+BKvbFc= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= diff --git a/main.go b/main.go index 21f2b97..8ebfa9d 100644 --- a/main.go +++ b/main.go @@ -39,7 +39,10 @@ func init() { logLevel = cmp.Or(logLevel, "debug") // default to debug // Setup the default logger be a json logger. - slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + // + // Note that MCP requires stdout to be used exclusively for JSON-RPC messages. + // All logging must go to stderr. + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ Level: func() slog.Level { switch logLevel { case "debug": @@ -75,11 +78,19 @@ func main() { }, }) + ctx := context.Background() s := newServer() + go s.initializeDocumentResources(ctx) + defer s.CleanupSession(ctx) // Cleanup session on exit. // Notes: We word the tool names using "document" instead of "file" to avoid confusion with the file tool which // is already spreaded everywhere in LLM host applications. For instance, Claude or Cursor both have their own file tool. + mcp.AddTool(impl, &mcp.Tool{ + Name: "list_documents", + Description: "List all documents in the session.", + }, s.ListDocuments) + mcp.AddTool(impl, &mcp.Tool{ Name: "upload_document", Description: "Upload a document from a URL, local path, or data URI to Tensorlake and obtain a document_id to be used later in other processing/parsing steps.", @@ -118,7 +129,7 @@ func main() { Properties: map[string]*jsonschema.Schema{ "document_id": { Type: "string", - Description: "The document Id to start parsing. Example: 'file_1234567890'. This is the document_id returned by the upload_document tool.", + Description: "The document Id to start parsing. Example: 'file_1234567890'. This is the document_id returned by the upload_document tool. A document ID must be provided.", }, "parse_id": { Type: "string", @@ -130,9 +141,17 @@ func main() { }, // TODO: extend parsing options. }, + Required: []string{"document_id"}, }, }, s.ParseDocument) + impl.AddResource(&mcp.Resource{ + Name: "documents", + Description: "Access all documents and their metadata", + URI: "tensorlake://documents", + MIMEType: "application/json", + }, s.DocumentResources) + if err := impl.Run(context.Background(), &mcp.StdioTransport{}); err != nil { slog.Error("failed to run tensorlake-mcp", "error", err) } diff --git a/resources.go b/resources.go new file mode 100644 index 0000000..811f53f --- /dev/null +++ b/resources.go @@ -0,0 +1,126 @@ +// Copyright 2025 SIXT SE +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/url" + + "github.com/go4org/hashtriemap" + "github.com/modelcontextprotocol/go-sdk/mcp" + "github.com/sixt/tensorlake-go" +) + +type FileInfo struct { + FileId string `json:"file_id"` + FileName string `json:"file_name"` + MimeType tensorlake.MimeType `json:"mime_type"` + FileSize int64 `json:"file_size"` + ChecksumSHA256 string `json:"checksum_sha256,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + ParseJobs []*tensorlake.ParseResult `json:"parse_jobs,omitempty"` +} + +var ( + files hashtriemap.HashTrieMap[string, *FileInfo] +) + +func (s *server) initializeDocumentResources(ctx context.Context) { + // Iterate all parse jobs. This way we get all parsed results and their files if any. + for parseJob, err := range s.tl.IterParseJobs(ctx, 100) { + if err != nil { + slog.Error("failed to iterate parse jobs", "error", err) + break + } + + // Correlate parse jobs and their documents. + r, err := s.tl.GetParseResult(ctx, parseJob.ParseId, tensorlake.WithOptions(true)) + if err != nil { + continue + } + if r.Options == nil { + continue + } + + fileId := r.Options.FileId + if fileId == "" { + continue + } + + m, err := s.tl.GetFileMetadata(ctx, fileId) + if err != nil { + continue + } + + finfo := &FileInfo{ + FileId: fileId, + FileName: m.FileName, + MimeType: m.MimeType, + FileSize: m.FileSize, + ChecksumSHA256: m.ChecksumSHA256, + CreatedAt: m.CreatedAt, + Labels: m.Labels, + ParseJobs: []*tensorlake.ParseResult{r}, + } + + info, ok := files.Load(fileId) + if !ok { + files.Store(fileId, finfo) + } else { + info.ParseJobs = append(info.ParseJobs, r) + files.Store(fileId, info) + } + } +} + +// DocumentResources handles resource requests for document metadata and parse results. +// The URI is of the form "tensorlake://documents". +func (s *server) DocumentResources(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { + u, err := url.Parse(req.Params.URI) + if err != nil { + return nil, fmt.Errorf("invalid tensorlake resource URI: %s", req.Params.URI) + } + if u.Scheme != "tensorlake" { + return nil, fmt.Errorf("invalid tensorlake resource URI scheme: %s", u.Scheme) + } + if u.Host != "documents" { + return nil, fmt.Errorf("invalid tensorlake resource URI host: %s", u.Host) + } + + // List all documents + + ff := make([]*FileInfo, 0) + files.Range(func(key string, value *FileInfo) bool { + ff = append(ff, value) + return true + }) + data, err := json.MarshalIndent(ff, "", " ") + if err != nil { + return nil, fmt.Errorf("failed to marshal documents: %w", err) + } + return &mcp.ReadResourceResult{ + Contents: []*mcp.ResourceContents{ + { + URI: req.Params.URI, + MIMEType: "application/json", + Text: string(data), + }, + }, + }, nil +} diff --git a/tools.go b/tools.go index c1d3b8c..e3746c9 100644 --- a/tools.go +++ b/tools.go @@ -41,6 +41,15 @@ func newServer() *server { return &server{tl: tensorlake.NewClient(tensorlake.WithBaseURL(tlAPIBaseURL), tensorlake.WithAPIKey(tlAPIKey))} } +func (s *server) ListDocuments(ctx context.Context, req *mcp.CallToolRequest, _ any) (*mcp.CallToolResult, any, error) { + documents := make([]*FileInfo, 0) + files.Range(func(key string, value *FileInfo) bool { + documents = append(documents, value) + return true + }) + return newToolResultJSON(documents) +} + type UploadDocumentInput struct { URL string `json:"url"` } @@ -109,6 +118,14 @@ func (s *server) uploadDocumentFromURL(ctx context.Context, _ *mcp.CallToolReque return newToolResultError(fmt.Errorf("failed to get document metadata: %w", err)) } + files.Store(r.FileId, &FileInfo{ + FileId: r.FileId, + FileName: m.FileName, + MimeType: m.MimeType, + FileSize: m.FileSize, + ChecksumSHA256: m.ChecksumSHA256, + CreatedAt: r.CreatedAt.Format(time.RFC3339), + }) return newToolResultJSON(&UploadDocumentOutput{ DocumentId: r.FileId, DocumentName: m.FileName, @@ -203,6 +220,14 @@ func (s *server) uploadDocumentFromDataURI(ctx context.Context, _ *mcp.CallToolR if err != nil { return newToolResultError(fmt.Errorf("failed to get document metadata: %w", err)) } + files.Store(r.FileId, &FileInfo{ + FileId: r.FileId, + FileName: m.FileName, + MimeType: m.MimeType, + FileSize: m.FileSize, + ChecksumSHA256: m.ChecksumSHA256, + CreatedAt: r.CreatedAt.Format(time.RFC3339), + }) return newToolResultJSON(&UploadDocumentOutput{ DocumentId: r.FileId, @@ -239,6 +264,14 @@ func (s *server) uploadDocumentFromLocalPath(ctx context.Context, _ *mcp.CallToo return newToolResultError(fmt.Errorf("failed to get document metadata: %w", err)) } + files.Store(r.FileId, &FileInfo{ + FileId: r.FileId, + FileName: m.FileName, + MimeType: m.MimeType, + FileSize: m.FileSize, + ChecksumSHA256: m.ChecksumSHA256, + CreatedAt: r.CreatedAt.Format(time.RFC3339), + }) return newToolResultJSON(&UploadDocumentOutput{ DocumentId: r.FileId, DocumentName: m.FileName, @@ -259,6 +292,18 @@ func (s *server) DeleteDocument(ctx context.Context, _ *mcp.CallToolRequest, in if err != nil { return newToolResultError(fmt.Errorf("failed to delete document: %w", err)) } + + // Delete all relevant parse jobs. + if info, ok := files.Load(in.DocumentId); ok { + for _, parseJob := range info.ParseJobs { + err := s.tl.DeleteParseJob(ctx, parseJob.ParseId) + if err != nil { + slog.Error("failed to delete parse job", "parse_id", parseJob.ParseId, "error", err) + } + } + } + + files.Delete(in.DocumentId) return newToolResultJSON(fmt.Sprintf("Document (%s) deleted successfully", in.DocumentId)) } @@ -271,18 +316,19 @@ type ParseDocumentInput struct { // ParseDocumentOutput represents the output from the extract_text tool. type ParseDocumentOutput struct { - ParseID string `json:"parse_id"` - Status tensorlake.ParseStatus `json:"status"` - Message string `json:"message"` - Result any `json:"result,omitempty"` - CreatedAt string `json:"created_at"` // RFC3339 timestamp + DocumentId string `json:"document_id"` + ParseID string `json:"parse_id"` + Status tensorlake.ParseStatus `json:"status"` + Message string `json:"message"` + Result string `json:"result,omitempty"` + CreatedAt string `json:"created_at"` // RFC3339 timestamp } // ParseDocument handles the parse_document tool call. func (s *server) ParseDocument(ctx context.Context, req *mcp.CallToolRequest, in *ParseDocumentInput) (*mcp.CallToolResult, any, error) { // Fast path: if parse ID is provided, check the status and return the results. if in.ParseId != "" { - return s.fetchParseResult(ctx, req, in.ParseId, in.Sync) + return s.fetchParseResult(ctx, req, in.DocumentId, in.ParseId, in.Sync) } // If both document ID and parse ID are empty, throw an error. @@ -307,19 +353,35 @@ func (s *server) ParseDocument(ctx context.Context, req *mcp.CallToolRequest, in // If sync is true, poll for completion with exponential backoff if in.Sync { - return s.fetchParseResult(ctx, req, pr.ParseId, true) + return s.fetchParseResult(ctx, req, in.DocumentId, pr.ParseId, true) } return newToolResultJSON(&ParseDocumentOutput{ - ParseID: pr.ParseId, - Status: tensorlake.ParseStatusPending, - Message: fmt.Sprintf("Parse job started (ID: %s)", pr.ParseId), - CreatedAt: time.Now().Format(time.RFC3339), + DocumentId: in.DocumentId, + ParseID: pr.ParseId, + Status: tensorlake.ParseStatusPending, + Message: fmt.Sprintf("Parse job started (ID: %s)", pr.ParseId), + CreatedAt: time.Now().Format(time.RFC3339), }) } // fetchParseResult retrieves and formats the parse result for a given parse Id. -func (s *server) fetchParseResult(ctx context.Context, req *mcp.CallToolRequest, parseId string, sync bool) (*mcp.CallToolResult, any, error) { +func (s *server) fetchParseResult(ctx context.Context, req *mcp.CallToolRequest, documentId, parseId string, sync bool) (*mcp.CallToolResult, any, error) { + // Fast path: If parse ID is in the parses map, return the results. + if pr, ok := files.Load(documentId); ok { + if len(pr.ParseJobs) > 0 { + return newToolResultJSON(&ParseDocumentOutput{ + DocumentId: documentId, + ParseID: parseId, + Status: tensorlake.ParseStatusSuccessful, + Result: pr.ParseJobs[0].Chunks[0].Content, + Message: fmt.Sprintf("Parse job done (ID: %s)", parseId), + CreatedAt: pr.CreatedAt, + }) + } + } + + // Slow path: Poll for the parse result. r, err := s.tl.GetParseResult(ctx, parseId, tensorlake.WithSSE(sync), tensorlake.WithOnUpdate(func(name tensorlake.ParseEventName, result *tensorlake.ParseResult) { switch name { case tensorlake.SSEEventParseQueued: @@ -337,18 +399,33 @@ func (s *server) fetchParseResult(ctx context.Context, req *mcp.CallToolRequest, } slog.Info("parse result fetched", "parse_id", r.ParseId, "status", r.Status, "results", r) - - s.sendProgress(ctx, req, 4, 4, fmt.Sprintf("Parse job done (ID: %s)", r.ParseId)) o := ParseDocumentOutput{ ParseID: r.ParseId, Status: r.Status, Message: fmt.Sprintf("Parse job done (ID: %s)", r.ParseId), - CreatedAt: time.Now().Format(time.RFC3339), + CreatedAt: r.CreatedAt, } // TODO: allow structured data output. if len(r.Chunks) > 0 { o.Result = r.Chunks[0].Content } + + // Store the parse result in the parses map. + info, ok := files.Load(documentId) + if !ok { + files.Store(documentId, &FileInfo{ + FileId: documentId, + FileName: info.FileName, + MimeType: info.MimeType, + FileSize: info.FileSize, + ChecksumSHA256: info.ChecksumSHA256, + CreatedAt: info.CreatedAt, + ParseJobs: []*tensorlake.ParseResult{r}, + }) + } else { + info.ParseJobs = append(info.ParseJobs, r) + files.Store(documentId, info) + } return newToolResultJSON(&o) } @@ -358,13 +435,8 @@ func (s *server) sendProgress(ctx context.Context, req *mcp.CallToolRequest, pro return } - token := req.Params.GetProgressToken() - if token == nil { - return - } - _ = req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{ - ProgressToken: token, + ProgressToken: req.Params.GetProgressToken(), Progress: progress, Total: total, Message: message, @@ -395,3 +467,25 @@ func newToolResultError(err error) (*mcp.CallToolResult, any, error) { }, }, nil, nil } + +func (s *server) CleanupSession(ctx context.Context) { + files.Range(func(key string, value *FileInfo) bool { + err := s.tl.DeleteFile(ctx, key) + if err != nil { + slog.Error("failed to delete document", "document_id", key, "error", err) + return false + } + slog.Info("document deleted", "document_id", key) + + for _, parseJob := range value.ParseJobs { + err := s.tl.DeleteParseJob(ctx, parseJob.ParseId) + if err != nil { + slog.Error("failed to delete parse job", "parse_id", parseJob.ParseId, "error", err) + return false + } + slog.Info("parse job deleted", "parse_id", parseJob.ParseId) + } + + return true + }) +}