From d8dbe13cc5095a4c2385add15c320c1f9148f76d Mon Sep 17 00:00:00 2001 From: KubrickCode Date: Thu, 4 Dec 2025 03:08:01 +0000 Subject: [PATCH] feat: add parallel test file scanner with worker pool Add Scanner API with parallel processing for large-scale project scanning - Concurrency control using errgroup + semaphore.Weighted pattern - Context-based timeout/cancellation support - Phase field in ScanError to distinguish detection/parsing errors - Functional options: WithWorkers, WithTimeout, WithExclude fix #9 --- src/go.work | 4 +- src/pkg/go.mod | 6 +- src/pkg/go.sum | 2 + src/pkg/parser/options.go | 52 ++++ src/pkg/parser/scanner.go | 202 +++++++++++++ src/pkg/parser/scanner_test.go | 310 ++++++++++++++++++++ src/pkg/parser/strategies/jest/jest_test.go | 2 +- 7 files changed, 575 insertions(+), 3 deletions(-) create mode 100644 src/pkg/parser/options.go create mode 100644 src/pkg/parser/scanner.go create mode 100644 src/pkg/parser/scanner_test.go diff --git a/src/go.work b/src/go.work index 0b33ecb..a3006c1 100644 --- a/src/go.work +++ b/src/go.work @@ -1,3 +1,5 @@ -go 1.24 +go 1.24.0 + +toolchain go1.24.11 use ./pkg diff --git a/src/pkg/go.mod b/src/pkg/go.mod index b27844f..a1ffe75 100644 --- a/src/pkg/go.mod +++ b/src/pkg/go.mod @@ -1,7 +1,11 @@ module github.com/specvital/core -go 1.24 +go 1.24.0 + +toolchain go1.24.11 require github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 require github.com/bmatcuk/doublestar/v4 v4.8.1 + +require golang.org/x/sync v0.18.0 diff --git a/src/pkg/go.sum b/src/pkg/go.sum index 522b353..a8a578c 100644 --- a/src/pkg/go.sum +++ b/src/pkg/go.sum @@ -8,5 +8,7 @@ github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 h1:6C8qej6f github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82/go.mod h1:xe4pgH49k4SsmkQq5OT8abwhWmnzkhpgnXeekbx2efw= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/pkg/parser/options.go b/src/pkg/parser/options.go new file mode 100644 index 0000000..4f58ad9 --- /dev/null +++ b/src/pkg/parser/options.go @@ -0,0 +1,52 @@ +package parser + +import "time" + +type ScanOptions struct { + ExcludePatterns []string + MaxFileSize int64 + Patterns []string + Timeout time.Duration + Workers int +} + +type ScanOption func(*ScanOptions) + +func WithExclude(patterns []string) ScanOption { + return func(o *ScanOptions) { + o.ExcludePatterns = patterns + } +} + +func WithScanMaxFileSize(size int64) ScanOption { + return func(o *ScanOptions) { + if size < 0 { + return + } + o.MaxFileSize = size + } +} + +func WithScanPatterns(patterns []string) ScanOption { + return func(o *ScanOptions) { + o.Patterns = patterns + } +} + +func WithTimeout(d time.Duration) ScanOption { + return func(o *ScanOptions) { + if d < 0 { + return + } + o.Timeout = d + } +} + +func WithWorkers(n int) ScanOption { + return func(o *ScanOptions) { + if n < 0 { + return + } + o.Workers = n + } +} diff --git a/src/pkg/parser/scanner.go b/src/pkg/parser/scanner.go new file mode 100644 index 0000000..8417dae --- /dev/null +++ b/src/pkg/parser/scanner.go @@ -0,0 +1,202 @@ +package parser + +import ( + "context" + "errors" + "fmt" + "os" + "runtime" + "sync" + "time" + + "github.com/specvital/core/domain" + "github.com/specvital/core/parser/strategies" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +const ( + DefaultWorkers = 0 + DefaultTimeout = 5 * time.Minute + MaxWorkers = 1024 +) + +var ( + ErrScanCancelled = errors.New("scanner: scan cancelled") + ErrScanTimeout = errors.New("scanner: scan timeout") +) + +type ScanResult struct { + Errors []ScanError + Inventory *domain.Inventory +} + +type ScanError struct { + Err error + Path string + Phase string +} + +func (e ScanError) Error() string { + if e.Path == "" { + return fmt.Sprintf("[%s] %v", e.Phase, e.Err) + } + return fmt.Sprintf("[%s] %s: %v", e.Phase, e.Path, e.Err) +} + +func Scan(ctx context.Context, rootPath string, opts ...ScanOption) (*ScanResult, error) { + options := &ScanOptions{ + ExcludePatterns: nil, + MaxFileSize: DefaultMaxFileSize, + Patterns: nil, + Timeout: DefaultTimeout, + Workers: DefaultWorkers, + } + + for _, opt := range opts { + opt(options) + } + + workers := options.Workers + if workers <= 0 { + workers = runtime.GOMAXPROCS(0) + } + if workers > MaxWorkers { + workers = MaxWorkers + } + + timeout := options.Timeout + if timeout <= 0 { + timeout = DefaultTimeout + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + detectorOpts := buildDetectorOpts(options) + detectionResult, err := DetectTestFiles(ctx, rootPath, detectorOpts...) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return nil, ErrScanTimeout + } + if errors.Is(err, context.Canceled) { + return nil, ErrScanCancelled + } + return nil, fmt.Errorf("scanner: detection failed: %w", err) + } + + scanResult := &ScanResult{ + Errors: make([]ScanError, 0), + Inventory: &domain.Inventory{ + Files: make([]domain.TestFile, 0), + RootPath: rootPath, + }, + } + + for _, detErr := range detectionResult.Errors { + scanResult.Errors = append(scanResult.Errors, ScanError{ + Err: detErr, + Path: "", + Phase: "detection", + }) + } + + if len(detectionResult.Files) == 0 { + return scanResult, nil + } + + files, errs := parseFilesParallel(ctx, detectionResult.Files, workers) + + scanResult.Inventory.Files = files + scanResult.Errors = append(scanResult.Errors, errs...) + + return scanResult, nil +} + +func buildDetectorOpts(options *ScanOptions) []DetectorOption { + var detectorOpts []DetectorOption + + if len(options.ExcludePatterns) > 0 { + merged := make([]string, 0, len(DefaultSkipPatterns)+len(options.ExcludePatterns)) + merged = append(merged, DefaultSkipPatterns...) + merged = append(merged, options.ExcludePatterns...) + detectorOpts = append(detectorOpts, WithSkipPatterns(merged)) + } + + if len(options.Patterns) > 0 { + detectorOpts = append(detectorOpts, WithPatterns(options.Patterns)) + } + + if options.MaxFileSize > 0 { + detectorOpts = append(detectorOpts, WithMaxFileSize(options.MaxFileSize)) + } + + return detectorOpts +} + +func parseFilesParallel(ctx context.Context, files []string, workers int) ([]domain.TestFile, []ScanError) { + sem := semaphore.NewWeighted(int64(workers)) + g, gCtx := errgroup.WithContext(ctx) + + var ( + mu sync.Mutex + results = make([]domain.TestFile, 0, len(files)) + scanErrors = make([]ScanError, 0) + ) + + for _, file := range files { + g.Go(func() error { + if err := sem.Acquire(gCtx, 1); err != nil { + return nil // Context cancelled + } + defer sem.Release(1) + + testFile, err := parseFile(gCtx, file) + + mu.Lock() + defer mu.Unlock() + + if err != nil { + scanErrors = append(scanErrors, ScanError{ + Err: err, + Path: file, + Phase: "parsing", + }) + return nil // Continue with other files + } + + if testFile != nil { + results = append(results, *testFile) + } + + return nil + }) + } + + _ = g.Wait() // Errors are collected in scanErrors + + return results, scanErrors +} + +func parseFile(ctx context.Context, path string) (*domain.TestFile, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + content, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read file %s: %w", path, err) + } + + strategy := strategies.FindStrategy(path, content) + if strategy == nil { + return nil, nil // No matching strategy + } + + testFile, err := strategy.Parse(ctx, content, path) + if err != nil { + return nil, fmt.Errorf("parse %s: %w", path, err) + } + + return testFile, nil +} diff --git a/src/pkg/parser/scanner_test.go b/src/pkg/parser/scanner_test.go new file mode 100644 index 0000000..02c46e5 --- /dev/null +++ b/src/pkg/parser/scanner_test.go @@ -0,0 +1,310 @@ +package parser_test + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/specvital/core/parser" + "github.com/specvital/core/parser/strategies" + "github.com/specvital/core/parser/strategies/jest" +) + +func TestMain(m *testing.M) { + strategies.DefaultRegistry().Clear() + jest.RegisterDefault() + os.Exit(m.Run()) +} + +func TestScan(t *testing.T) { + t.Run("should return empty inventory for empty directory", func(t *testing.T) { + tmpDir := t.TempDir() + + result, err := parser.Scan(context.Background(), tmpDir) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result.Inventory == nil { + t.Fatal("inventory should not be nil") + } + if len(result.Inventory.Files) != 0 { + t.Errorf("expected 0 files, got %d", len(result.Inventory.Files)) + } + }) + + t.Run("should scan test files in directory", func(t *testing.T) { + tmpDir := t.TempDir() + + testContent := []byte(` +describe('UserService', () => { + it('should create user', () => {}); + it('should delete user', () => {}); +}); +`) + testFile := filepath.Join(tmpDir, "user.test.ts") + if err := os.WriteFile(testFile, testContent, 0644); err != nil { + t.Fatalf("failed to write test file: %v", err) + } + + result, err := parser.Scan(context.Background(), tmpDir) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.Inventory.Files) != 1 { + t.Errorf("expected 1 file, got %d", len(result.Inventory.Files)) + } + if result.Inventory.RootPath != tmpDir { + t.Errorf("expected rootPath %s, got %s", tmpDir, result.Inventory.RootPath) + } + }) + + t.Run("should respect exclude patterns", func(t *testing.T) { + tmpDir := t.TempDir() + + customDir := filepath.Join(tmpDir, "custom_exclude") + if err := os.MkdirAll(customDir, 0755); err != nil { + t.Fatalf("failed to create dir: %v", err) + } + + testContent := []byte(`it('test', () => {});`) + if err := os.WriteFile(filepath.Join(customDir, "excluded.test.ts"), testContent, 0644); err != nil { + t.Fatalf("failed to write: %v", err) + } + + result, err := parser.Scan(context.Background(), tmpDir, parser.WithExclude([]string{"custom_exclude"})) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.Inventory.Files) != 0 { + t.Errorf("expected 0 files, got %d", len(result.Inventory.Files)) + } + }) + + t.Run("should respect worker count option", func(t *testing.T) { + tmpDir := t.TempDir() + + result, err := parser.Scan(context.Background(), tmpDir, parser.WithWorkers(2)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result == nil { + t.Fatal("result should not be nil") + } + }) + + t.Run("should respect timeout option", func(t *testing.T) { + tmpDir := t.TempDir() + + result, err := parser.Scan(context.Background(), tmpDir, parser.WithTimeout(30*time.Second)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result == nil { + t.Fatal("result should not be nil") + } + }) + + t.Run("should return error for non-existent path", func(t *testing.T) { + _, err := parser.Scan(context.Background(), "/non/existent/path") + if err == nil { + t.Error("expected error for non-existent path") + } + }) + + t.Run("should return ErrScanCancelled on context cancellation", func(t *testing.T) { + tmpDir := t.TempDir() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := parser.Scan(ctx, tmpDir) + if !errors.Is(err, parser.ErrScanCancelled) { + t.Errorf("expected ErrScanCancelled, got %v", err) + } + }) + + t.Run("should aggregate errors from multiple files", func(t *testing.T) { + tmpDir := t.TempDir() + + validContent := []byte(`it('test', () => {});`) + if err := os.WriteFile(filepath.Join(tmpDir, "valid.test.ts"), validContent, 0644); err != nil { + t.Fatalf("failed to write: %v", err) + } + + result, err := parser.Scan(context.Background(), tmpDir) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.Inventory.Files) < 1 { + t.Errorf("expected at least 1 parsed file") + } + }) +} + +func TestScan_Concurrency(t *testing.T) { + t.Run("should safely handle concurrent access", func(t *testing.T) { + tmpDir := t.TempDir() + + for i := 0; i < 10; i++ { + content := []byte(`it('test', () => {});`) + filename := filepath.Join(tmpDir, fmt.Sprintf("test%d.test.ts", i)) + if err := os.WriteFile(filename, content, 0644); err != nil { + t.Fatalf("failed to write: %v", err) + } + } + + var wg sync.WaitGroup + var errCount atomic.Int32 + + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := parser.Scan(context.Background(), tmpDir, parser.WithWorkers(4)) + if err != nil { + errCount.Add(1) + } + }() + } + + wg.Wait() + + if errCount.Load() > 0 { + t.Errorf("concurrent scans had %d errors", errCount.Load()) + } + }) + + t.Run("should complete with race detector", func(t *testing.T) { + tmpDir := t.TempDir() + + for i := 0; i < 20; i++ { + content := []byte(` +describe('Suite', () => { + it('test 1', () => {}); + it('test 2', () => {}); +}); +`) + filename := filepath.Join(tmpDir, fmt.Sprintf("test%d.test.ts", i)) + if err := os.WriteFile(filename, content, 0644); err != nil { + t.Fatalf("failed to write: %v", err) + } + } + + result, err := parser.Scan(context.Background(), tmpDir, parser.WithWorkers(8)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.Inventory.Files) != 20 { + t.Errorf("expected 20 files, got %d", len(result.Inventory.Files)) + } + }) +} + +func TestScanOptions(t *testing.T) { + t.Run("WithWorkers sets worker count", func(t *testing.T) { + opts := &parser.ScanOptions{} + parser.WithWorkers(4)(opts) + if opts.Workers != 4 { + t.Errorf("expected 4 workers, got %d", opts.Workers) + } + }) + + t.Run("WithTimeout sets timeout", func(t *testing.T) { + opts := &parser.ScanOptions{} + parser.WithTimeout(30 * time.Second)(opts) + if opts.Timeout != 30*time.Second { + t.Errorf("expected 30s timeout, got %v", opts.Timeout) + } + }) + + t.Run("WithExclude sets patterns", func(t *testing.T) { + opts := &parser.ScanOptions{} + patterns := []string{"vendor", "dist"} + parser.WithExclude(patterns)(opts) + if len(opts.ExcludePatterns) != 2 { + t.Errorf("expected 2 patterns, got %d", len(opts.ExcludePatterns)) + } + }) + + t.Run("WithScanMaxFileSize sets max size", func(t *testing.T) { + opts := &parser.ScanOptions{} + parser.WithScanMaxFileSize(1024)(opts) + if opts.MaxFileSize != 1024 { + t.Errorf("expected 1024, got %d", opts.MaxFileSize) + } + }) + + t.Run("WithScanMaxFileSize ignores negative values", func(t *testing.T) { + opts := &parser.ScanOptions{MaxFileSize: 100} + parser.WithScanMaxFileSize(-1)(opts) + if opts.MaxFileSize != 100 { + t.Errorf("expected 100 (unchanged), got %d", opts.MaxFileSize) + } + }) + + t.Run("WithScanPatterns sets patterns", func(t *testing.T) { + opts := &parser.ScanOptions{} + patterns := []string{"**/*.test.ts"} + parser.WithScanPatterns(patterns)(opts) + if len(opts.Patterns) != 1 { + t.Errorf("expected 1 pattern, got %d", len(opts.Patterns)) + } + }) + + t.Run("WithTimeout ignores negative values", func(t *testing.T) { + opts := &parser.ScanOptions{Timeout: time.Minute} + parser.WithTimeout(-1)(opts) + if opts.Timeout != time.Minute { + t.Errorf("expected 1m (unchanged), got %v", opts.Timeout) + } + }) + + t.Run("WithWorkers ignores negative values", func(t *testing.T) { + opts := &parser.ScanOptions{Workers: 4} + parser.WithWorkers(-1)(opts) + if opts.Workers != 4 { + t.Errorf("expected 4 (unchanged), got %d", opts.Workers) + } + }) +} + +func TestScanError(t *testing.T) { + t.Run("Error with path returns formatted string", func(t *testing.T) { + err := parser.ScanError{ + Err: os.ErrNotExist, + Path: "/path/to/file.ts", + Phase: "parsing", + } + + expected := "[parsing] /path/to/file.ts: file does not exist" + if err.Error() != expected { + t.Errorf("expected %q, got %q", expected, err.Error()) + } + }) + + t.Run("Error without path returns phase only", func(t *testing.T) { + err := parser.ScanError{ + Err: os.ErrPermission, + Path: "", + Phase: "detection", + } + + expected := "[detection] permission denied" + if err.Error() != expected { + t.Errorf("expected %q, got %q", expected, err.Error()) + } + }) +} diff --git a/src/pkg/parser/strategies/jest/jest_test.go b/src/pkg/parser/strategies/jest/jest_test.go index ec2d824..4b1a622 100644 --- a/src/pkg/parser/strategies/jest/jest_test.go +++ b/src/pkg/parser/strategies/jest/jest_test.go @@ -409,7 +409,7 @@ func TestStrategy_Parse_Location(t *testing.T) { } func TestRegisterDefault(t *testing.T) { - // NOTE: This test modifies global state, so it cannot run in parallel. + // Given strategies.DefaultRegistry().Clear() defer strategies.DefaultRegistry().Clear()