From bad925908853ecb7c8550e29e2c061b4d410bc35 Mon Sep 17 00:00:00 2001 From: WSL Aydin Home Date: Tue, 27 Jan 2026 19:33:28 -0700 Subject: [PATCH 01/13] feat: add multi-instance subprocess spawning support Solves the psiphon.SetNoticeWriter() global singleton issue by spawning separate conduit processes instead of running instances in-process. Changes: - Add --multi-instance flag to spawn N child processes (1 per 100 clients) - Add --instances flag for explicit instance count control - New multi.go: subprocess spawner with stats aggregation - Export FormatBytes/FormatDuration for use by multi-instance aggregator - Parent captures child STATS lines and prints AGGREGATE every 10s - Stats JSON includes per-instance breakdown - Final stats write on graceful shutdown Each child process has its own data directory with separate keys, avoiding the global SetNoticeWriter conflict that caused N-counted stats. Usage: conduit start --multi-instance -m 200 # spawns 2 instances conduit start --instances=3 -m 300 # explicit 3 instances --- cli/cmd/start.go | 36 ++- cli/go.mod | 1 - cli/internal/conduit/multi.go | 454 ++++++++++++++++++++++++++++++++ cli/internal/conduit/service.go | 14 +- 4 files changed, 490 insertions(+), 15 deletions(-) create mode 100644 cli/internal/conduit/multi.go diff --git a/cli/cmd/start.go b/cli/cmd/start.go index ea68a81a..1c66d591 100644 --- a/cli/cmd/start.go +++ b/cli/cmd/start.go @@ -37,6 +37,8 @@ var ( bandwidthMbps float64 psiphonConfigPath string statsFilePath string + multiInstance bool + numInstances int ) var startCmd = &cobra.Command{ @@ -61,8 +63,10 @@ func init() { startCmd.Flags().IntVarP(&maxClients, "max-clients", "m", config.DefaultMaxClients, "maximum number of proxy clients (1-1000)") startCmd.Flags().Float64VarP(&bandwidthMbps, "bandwidth", "b", config.DefaultBandwidthMbps, "total bandwidth limit in Mbps (-1 for unlimited)") - startCmd.Flags().StringVarP(&statsFilePath, "stats-file", "s", "", "persist stats to JSON file (default: stats.json in data dir if flag used without value)") + startCmd.Flags().StringVarP(&statsFilePath, "stats-file", "s", "", "persist stats to JSON file (-s for default, -s=path or --stats-file=path for custom)") startCmd.Flags().Lookup("stats-file").NoOptDefVal = "stats.json" + startCmd.Flags().BoolVar(&multiInstance, "multi-instance", false, "run multiple instances (1 per 100 max-clients)") + startCmd.Flags().IntVar(&numInstances, "instances", 0, "number of instances (0 = auto-calculate based on max-clients)") // Only show --psiphon-config flag if no config is embedded if !config.HasEmbeddedConfig() { @@ -108,12 +112,6 @@ func runStart(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load configuration: %w", err) } - // Create conduit service - service, err := conduit.New(cfg) - if err != nil { - return fmt.Errorf("failed to create conduit service: %w", err) - } - // Setup context with cancellation ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -128,6 +126,30 @@ func runStart(cmd *cobra.Command, args []string) error { cancel() }() + // Multi-instance mode: spawn subprocesses + if multiInstance || numInstances > 0 { + instances := numInstances + if instances <= 0 { + instances = conduit.CalculateInstances(maxClients) + } + multiService, err := conduit.NewMultiService(cfg, instances) + if err != nil { + return fmt.Errorf("failed to create multi-instance service: %w", err) + } + + if err := multiService.Run(ctx); err != nil && ctx.Err() == nil { + return fmt.Errorf("multi-instance service error: %w", err) + } + + return nil + } + + // Single instance mode (default) + service, err := conduit.New(cfg) + if err != nil { + return fmt.Errorf("failed to create conduit service: %w", err) + } + // Print startup message bandwidthStr := "unlimited" if bandwidthMbps != config.UnlimitedBandwidth { diff --git a/cli/go.mod b/cli/go.mod index ff276307..182aa758 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -6,7 +6,6 @@ require ( github.com/spf13/cobra v1.8.1 github.com/tyler-smith/go-bip39 v1.1.0 golang.org/x/crypto v0.39.0 - golang.org/x/term v0.32.0 ) require ( diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go new file mode 100644 index 00000000..4666607b --- /dev/null +++ b/cli/internal/conduit/multi.go @@ -0,0 +1,454 @@ +/* + * Copyright (c) 2026, Psiphon Inc. + * All rights reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package conduit + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/Psiphon-Inc/conduit/cli/internal/config" +) + +const ( + // ClientsPerInstance is the target number of clients per instance + ClientsPerInstance = 100 +) + +// InstanceStats tracks stats for a single instance +type InstanceStats struct { + ID string + IsLive bool + Connecting int + Connected int + BytesUp int64 + BytesDown int64 +} + +// MultiService manages multiple conduit subprocess instances +type MultiService struct { + config *config.Config + numInstances int + processes []*exec.Cmd + instanceStats []*InstanceStats + cancel context.CancelFunc + mu sync.Mutex + startTime time.Time + statsDone chan struct{} +} + +// AggregateStatsJSON represents the JSON structure for multi-instance stats +type AggregateStatsJSON struct { + LiveInstances int `json:"liveInstances"` + TotalInstances int `json:"totalInstances"` + ConnectingClients int `json:"connectingClients"` + ConnectedClients int `json:"connectedClients"` + TotalBytesUp int64 `json:"totalBytesUp"` + TotalBytesDown int64 `json:"totalBytesDown"` + UptimeSeconds int64 `json:"uptimeSeconds"` + Timestamp string `json:"timestamp"` + Instances []InstanceJSON `json:"instances,omitempty"` +} + +// InstanceJSON represents per-instance stats in JSON +type InstanceJSON struct { + ID string `json:"id"` + IsLive bool `json:"isLive"` + Connecting int `json:"connecting"` + Connected int `json:"connected"` + BytesUp int64 `json:"bytesUp"` + BytesDown int64 `json:"bytesDown"` +} + +// NewMultiService creates a multi-instance service that spawns subprocesses +func NewMultiService(cfg *config.Config, numInstances int) (*MultiService, error) { + instanceStats := make([]*InstanceStats, numInstances) + for i := 0; i < numInstances; i++ { + instanceStats[i] = &InstanceStats{ + ID: fmt.Sprintf("instance-%d", i), + } + } + + return &MultiService{ + config: cfg, + numInstances: numInstances, + processes: make([]*exec.Cmd, numInstances), + instanceStats: instanceStats, + startTime: time.Now(), + statsDone: make(chan struct{}), + }, nil +} + +// Run starts all subprocess instances and monitors them +func (m *MultiService) Run(ctx context.Context) error { + ctx, m.cancel = context.WithCancel(ctx) + + // Calculate clients per instance + clientsPerInstance := m.config.MaxClients / m.numInstances + if clientsPerInstance < 1 { + clientsPerInstance = 1 + } + + // Bandwidth per instance (if set) + var bandwidthPerInstance float64 + if m.config.BandwidthBytesPerSecond > 0 { + bandwidthPerInstance = float64(m.config.BandwidthBytesPerSecond) / float64(m.numInstances) + bandwidthPerInstance = bandwidthPerInstance / 125000 // Convert to Mbps + } else { + bandwidthPerInstance = -1 // unlimited + } + + // Print startup message + bandwidthStr := "unlimited" + if bandwidthPerInstance > 0 { + bandwidthStr = fmt.Sprintf("%.0f Mbps/instance", bandwidthPerInstance) + } + fmt.Printf("Starting %d Psiphon Conduit instances (Max Clients/instance: %d, Bandwidth: %s)\n", + m.numInstances, clientsPerInstance, bandwidthStr) + + var wg sync.WaitGroup + errChan := make(chan error, m.numInstances) + + // Start all instances + for i := 0; i < m.numInstances; i++ { + instanceDataDir := filepath.Join(m.config.DataDir, fmt.Sprintf("instance-%d", i)) + + // Create instance data directory + if err := os.MkdirAll(instanceDataDir, 0700); err != nil { + return fmt.Errorf("failed to create instance directory: %w", err) + } + + wg.Add(1) + go func(idx int, dataDir string) { + defer wg.Done() + if err := m.runInstance(ctx, idx, dataDir, clientsPerInstance, bandwidthPerInstance); err != nil { + if ctx.Err() == nil { + errChan <- fmt.Errorf("instance-%d: %w", idx, err) + } + } + }(i, instanceDataDir) + + fmt.Printf("[instance-%d] Starting with data dir: %s\n", i, instanceDataDir) + } + + // Start stats aggregation goroutine + go m.aggregateAndPrintStats(ctx) + + // Wait for all instances to complete + wg.Wait() + + // Cancel context to trigger final stats write + m.cancel() + + // Wait for stats goroutine to complete its final write + <-m.statsDone + + fmt.Println("All instances stopped.") + + // Check for errors + select { + case err := <-errChan: + return err + default: + return nil + } +} + +// runInstance spawns and monitors a single conduit subprocess +func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, maxClients int, bandwidthMbps float64) error { + // Build command arguments + args := []string{"start", + "--data-dir", dataDir, + "-m", strconv.Itoa(maxClients), + } + + if bandwidthMbps > 0 { + args = append(args, "-b", fmt.Sprintf("%.2f", bandwidthMbps)) + } else { + args = append(args, "-b", "-1") + } + + // Pass through psiphon config path if set (for non-embedded config builds) + if m.config.PsiphonConfigPath != "" { + args = append(args, "-c", m.config.PsiphonConfigPath) + } + + // Pass through verbosity (children always need at least -v to output STATS) + if m.config.Verbosity >= 2 { + args = append(args, "-vv") + } else { + args = append(args, "-v") + } + + // Note: we intentionally don't pass --stats-file to children + // Parent aggregates stats and writes the combined file + + // Get the current executable path + executable, err := os.Executable() + if err != nil { + return fmt.Errorf("failed to get executable path: %w", err) + } + + cmd := exec.CommandContext(ctx, executable, args...) + cmd.Env = os.Environ() + + // Capture stdout for stats parsing + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to create stdout pipe: %w", err) + } + + // Capture stderr too + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("failed to create stderr pipe: %w", err) + } + + // Start the process + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start instance: %w", err) + } + + m.mu.Lock() + m.processes[idx] = cmd + m.mu.Unlock() + + // Forward stderr with prefix + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + fmt.Printf("[instance-%d] %s\n", idx, scanner.Text()) + } + }() + + // Parse stdout for stats and connection status + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + line := scanner.Text() + m.parseInstanceOutput(idx, line) + } + + return cmd.Wait() +} + +// parseInstanceOutput processes output from a subprocess instance +func (m *MultiService) parseInstanceOutput(idx int, line string) { + m.mu.Lock() + defer m.mu.Unlock() + + stats := m.instanceStats[idx] + + // Check for connection status + if strings.Contains(line, "[OK] Connected to Psiphon network") { + stats.IsLive = true + fmt.Printf("[instance-%d] Connected to Psiphon network\n", idx) + return + } + + // Parse STATS lines: [STATS] Connecting: N | Connected: N | Up: X | Down: Y | Uptime: Z + if strings.Contains(line, "[STATS]") { + m.parseStatsLine(stats, line) + // Don't forward individual STATS - we aggregate them + return + } + + // Forward other output with instance prefix (only in verbose mode) + if m.config.Verbosity >= 1 { + fmt.Printf("[instance-%d] %s\n", idx, line) + } +} + +// parseStatsLine extracts stats from a STATS output line +func (m *MultiService) parseStatsLine(stats *InstanceStats, line string) { + // Regex patterns for stats extraction + connectingRe := regexp.MustCompile(`Connecting:\s*(\d+)`) + connectedRe := regexp.MustCompile(`Connected:\s*(\d+)`) + upRe := regexp.MustCompile(`Up:\s*([\d.]+)\s*([KMGTPE]?B)`) + downRe := regexp.MustCompile(`Down:\s*([\d.]+)\s*([KMGTPE]?B)`) + + if match := connectingRe.FindStringSubmatch(line); len(match) > 1 { + if v, err := strconv.Atoi(match[1]); err == nil { + stats.Connecting = v + } + } + if match := connectedRe.FindStringSubmatch(line); len(match) > 1 { + if v, err := strconv.Atoi(match[1]); err == nil { + stats.Connected = v + } + } + if match := upRe.FindStringSubmatch(line); len(match) > 2 { + stats.BytesUp = parseByteValue(match[1], match[2]) + } + if match := downRe.FindStringSubmatch(line); len(match) > 2 { + stats.BytesDown = parseByteValue(match[1], match[2]) + } +} + +// parseByteValue converts a human-readable byte string to int64 +func parseByteValue(numStr, unit string) int64 { + val, err := strconv.ParseFloat(numStr, 64) + if err != nil { + return 0 + } + + multipliers := map[string]float64{ + "B": 1, + "KB": 1024, + "MB": 1024 * 1024, + "GB": 1024 * 1024 * 1024, + "TB": 1024 * 1024 * 1024 * 1024, + "PB": 1024 * 1024 * 1024 * 1024 * 1024, + "EB": 1024 * 1024 * 1024 * 1024 * 1024 * 1024, + } + + if mult, ok := multipliers[unit]; ok { + return int64(val * mult) + } + return int64(val) +} + +// aggregateAndPrintStats periodically prints combined stats from all instances +func (m *MultiService) aggregateAndPrintStats(ctx context.Context) { + defer close(m.statsDone) + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Final stats write on shutdown + m.printAndWriteStats() + return + case <-ticker.C: + m.printAndWriteStats() + } + } +} + +// printAndWriteStats aggregates, prints, and optionally writes stats to file +func (m *MultiService) printAndWriteStats() { + m.mu.Lock() + defer m.mu.Unlock() + + // Aggregate stats from all instances + var liveCount, totalConnecting, totalConnected int + var totalUp, totalDown int64 + + instances := make([]InstanceJSON, m.numInstances) + for i, stats := range m.instanceStats { + if stats.IsLive { + liveCount++ + } + totalConnecting += stats.Connecting + totalConnected += stats.Connected + totalUp += stats.BytesUp + totalDown += stats.BytesDown + + instances[i] = InstanceJSON{ + ID: stats.ID, + IsLive: stats.IsLive, + Connecting: stats.Connecting, + Connected: stats.Connected, + BytesUp: stats.BytesUp, + BytesDown: stats.BytesDown, + } + } + + uptime := time.Since(m.startTime).Truncate(time.Second) + + // Print aggregate stats + fmt.Printf("%s [AGGREGATE] Live: %d/%d | Connecting: %d | Connected: %d | Up: %s | Down: %s | Uptime: %s\n", + time.Now().Format("2006-01-02 15:04:05"), + liveCount, + m.numInstances, + totalConnecting, + totalConnected, + FormatBytes(totalUp), + FormatBytes(totalDown), + FormatDuration(uptime), + ) + + // Write stats file if configured + if m.config.StatsFile != "" { + statsJSON := AggregateStatsJSON{ + LiveInstances: liveCount, + TotalInstances: m.numInstances, + ConnectingClients: totalConnecting, + ConnectedClients: totalConnected, + TotalBytesUp: totalUp, + TotalBytesDown: totalDown, + UptimeSeconds: int64(uptime.Seconds()), + Timestamp: time.Now().Format(time.RFC3339), + Instances: instances, + } + + data, err := json.MarshalIndent(statsJSON, "", " ") + if err != nil { + fmt.Printf("[ERROR] Failed to marshal stats: %v\n", err) + return + } + + if err := os.WriteFile(m.config.StatsFile, data, 0644); err != nil { + fmt.Printf("[ERROR] Failed to write stats file %s: %v\n", m.config.StatsFile, err) + } else if m.config.Verbosity >= 2 { + fmt.Printf("[DEBUG] Wrote stats to %s\n", m.config.StatsFile) + } + } +} + +// Stop gracefully shuts down all instances +func (m *MultiService) Stop() { + if m.cancel != nil { + m.cancel() + } + + m.mu.Lock() + defer m.mu.Unlock() + + for i, cmd := range m.processes { + if cmd != nil && cmd.Process != nil { + cmd.Process.Signal(os.Interrupt) + m.processes[i] = nil + } + } +} + +// CalculateInstances determines how many instances to run based on max clients +func CalculateInstances(maxClients int) int { + instances := maxClients / ClientsPerInstance + if instances < 1 { + instances = 1 + } + // Cap at reasonable maximum + if instances > 10 { + instances = 10 + } + return instances +} diff --git a/cli/internal/conduit/service.go b/cli/internal/conduit/service.go index 8e158d1c..3c96e83a 100644 --- a/cli/internal/conduit/service.go +++ b/cli/internal/conduit/service.go @@ -322,9 +322,9 @@ func (s *Service) logStats() { time.Now().Format("2006-01-02 15:04:05"), s.stats.ConnectingClients, s.stats.ConnectedClients, - formatBytes(s.stats.TotalBytesUp), - formatBytes(s.stats.TotalBytesDown), - formatDuration(uptime), + FormatBytes(s.stats.TotalBytesUp), + FormatBytes(s.stats.TotalBytesDown), + FormatDuration(uptime), ) // Write stats to file if configured (copy data while locked, write async) @@ -359,8 +359,8 @@ func (s *Service) writeStatsToFile(statsJSON StatsJSON) { } } -// formatDuration formats duration in a human-readable way -func formatDuration(d time.Duration) string { +// FormatDuration formats duration in a human-readable way +func FormatDuration(d time.Duration) string { h := d / time.Hour m := (d % time.Hour) / time.Minute s := (d % time.Minute) / time.Second @@ -380,8 +380,8 @@ func (s *Service) GetStats() Stats { return *s.stats } -// formatBytes formats bytes as a human-readable string -func formatBytes(bytes int64) string { +// FormatBytes formats bytes as a human-readable string +func FormatBytes(bytes int64) string { const unit = 1024 if bytes < unit { return fmt.Sprintf("%d B", bytes) From efb5541798941b72a1a26ad968bc6a70c3d68137 Mon Sep 17 00:00:00 2001 From: WSL Aydin Home Date: Tue, 27 Jan 2026 19:37:46 -0700 Subject: [PATCH 02/13] chore: clean up inline comments --- cli/internal/conduit/multi.go | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 4666607b..0b9f942b 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -109,22 +109,19 @@ func NewMultiService(cfg *config.Config, numInstances int) (*MultiService, error func (m *MultiService) Run(ctx context.Context) error { ctx, m.cancel = context.WithCancel(ctx) - // Calculate clients per instance clientsPerInstance := m.config.MaxClients / m.numInstances if clientsPerInstance < 1 { clientsPerInstance = 1 } - // Bandwidth per instance (if set) var bandwidthPerInstance float64 if m.config.BandwidthBytesPerSecond > 0 { bandwidthPerInstance = float64(m.config.BandwidthBytesPerSecond) / float64(m.numInstances) bandwidthPerInstance = bandwidthPerInstance / 125000 // Convert to Mbps } else { - bandwidthPerInstance = -1 // unlimited + bandwidthPerInstance = -1 } - // Print startup message bandwidthStr := "unlimited" if bandwidthPerInstance > 0 { bandwidthStr = fmt.Sprintf("%.0f Mbps/instance", bandwidthPerInstance) @@ -135,11 +132,9 @@ func (m *MultiService) Run(ctx context.Context) error { var wg sync.WaitGroup errChan := make(chan error, m.numInstances) - // Start all instances for i := 0; i < m.numInstances; i++ { instanceDataDir := filepath.Join(m.config.DataDir, fmt.Sprintf("instance-%d", i)) - // Create instance data directory if err := os.MkdirAll(instanceDataDir, 0700); err != nil { return fmt.Errorf("failed to create instance directory: %w", err) } @@ -157,10 +152,8 @@ func (m *MultiService) Run(ctx context.Context) error { fmt.Printf("[instance-%d] Starting with data dir: %s\n", i, instanceDataDir) } - // Start stats aggregation goroutine go m.aggregateAndPrintStats(ctx) - // Wait for all instances to complete wg.Wait() // Cancel context to trigger final stats write @@ -171,7 +164,6 @@ func (m *MultiService) Run(ctx context.Context) error { fmt.Println("All instances stopped.") - // Check for errors select { case err := <-errChan: return err @@ -182,7 +174,6 @@ func (m *MultiService) Run(ctx context.Context) error { // runInstance spawns and monitors a single conduit subprocess func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, maxClients int, bandwidthMbps float64) error { - // Build command arguments args := []string{"start", "--data-dir", dataDir, "-m", strconv.Itoa(maxClients), @@ -206,10 +197,8 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, args = append(args, "-v") } - // Note: we intentionally don't pass --stats-file to children - // Parent aggregates stats and writes the combined file + // Don't pass --stats-file to children; parent aggregates and writes combined file - // Get the current executable path executable, err := os.Executable() if err != nil { return fmt.Errorf("failed to get executable path: %w", err) @@ -218,19 +207,16 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, cmd := exec.CommandContext(ctx, executable, args...) cmd.Env = os.Environ() - // Capture stdout for stats parsing stdout, err := cmd.StdoutPipe() if err != nil { return fmt.Errorf("failed to create stdout pipe: %w", err) } - // Capture stderr too stderr, err := cmd.StderrPipe() if err != nil { return fmt.Errorf("failed to create stderr pipe: %w", err) } - // Start the process if err := cmd.Start(); err != nil { return fmt.Errorf("failed to start instance: %w", err) } @@ -239,7 +225,6 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, m.processes[idx] = cmd m.mu.Unlock() - // Forward stderr with prefix go func() { scanner := bufio.NewScanner(stderr) for scanner.Scan() { @@ -247,7 +232,6 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, } }() - // Parse stdout for stats and connection status scanner := bufio.NewScanner(stdout) for scanner.Scan() { line := scanner.Text() @@ -264,29 +248,23 @@ func (m *MultiService) parseInstanceOutput(idx int, line string) { stats := m.instanceStats[idx] - // Check for connection status if strings.Contains(line, "[OK] Connected to Psiphon network") { stats.IsLive = true fmt.Printf("[instance-%d] Connected to Psiphon network\n", idx) return } - // Parse STATS lines: [STATS] Connecting: N | Connected: N | Up: X | Down: Y | Uptime: Z if strings.Contains(line, "[STATS]") { m.parseStatsLine(stats, line) - // Don't forward individual STATS - we aggregate them return } - // Forward other output with instance prefix (only in verbose mode) if m.config.Verbosity >= 1 { fmt.Printf("[instance-%d] %s\n", idx, line) } } -// parseStatsLine extracts stats from a STATS output line func (m *MultiService) parseStatsLine(stats *InstanceStats, line string) { - // Regex patterns for stats extraction connectingRe := regexp.MustCompile(`Connecting:\s*(\d+)`) connectedRe := regexp.MustCompile(`Connected:\s*(\d+)`) upRe := regexp.MustCompile(`Up:\s*([\d.]+)\s*([KMGTPE]?B)`) @@ -357,7 +335,6 @@ func (m *MultiService) printAndWriteStats() { m.mu.Lock() defer m.mu.Unlock() - // Aggregate stats from all instances var liveCount, totalConnecting, totalConnected int var totalUp, totalDown int64 @@ -383,7 +360,6 @@ func (m *MultiService) printAndWriteStats() { uptime := time.Since(m.startTime).Truncate(time.Second) - // Print aggregate stats fmt.Printf("%s [AGGREGATE] Live: %d/%d | Connecting: %d | Connected: %d | Up: %s | Down: %s | Uptime: %s\n", time.Now().Format("2006-01-02 15:04:05"), liveCount, @@ -395,7 +371,6 @@ func (m *MultiService) printAndWriteStats() { FormatDuration(uptime), ) - // Write stats file if configured if m.config.StatsFile != "" { statsJSON := AggregateStatsJSON{ LiveInstances: liveCount, From c9b1559b9072eeccbc6ee9fa6f973973b82f7156 Mon Sep 17 00:00:00 2001 From: WSL Aydin Home Date: Tue, 27 Jan 2026 19:44:47 -0700 Subject: [PATCH 03/13] fix: keep formatBytes/formatDuration unexported, restore x/term dep --- cli/go.mod | 1 + cli/internal/conduit/multi.go | 6 +++--- cli/internal/conduit/service.go | 12 +++++------- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cli/go.mod b/cli/go.mod index 182aa758..ff276307 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -6,6 +6,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/tyler-smith/go-bip39 v1.1.0 golang.org/x/crypto v0.39.0 + golang.org/x/term v0.32.0 ) require ( diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 0b9f942b..61c62c28 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -366,9 +366,9 @@ func (m *MultiService) printAndWriteStats() { m.numInstances, totalConnecting, totalConnected, - FormatBytes(totalUp), - FormatBytes(totalDown), - FormatDuration(uptime), + formatBytes(totalUp), + formatBytes(totalDown), + formatDuration(uptime), ) if m.config.StatsFile != "" { diff --git a/cli/internal/conduit/service.go b/cli/internal/conduit/service.go index 3c96e83a..51035a63 100644 --- a/cli/internal/conduit/service.go +++ b/cli/internal/conduit/service.go @@ -322,9 +322,9 @@ func (s *Service) logStats() { time.Now().Format("2006-01-02 15:04:05"), s.stats.ConnectingClients, s.stats.ConnectedClients, - FormatBytes(s.stats.TotalBytesUp), - FormatBytes(s.stats.TotalBytesDown), - FormatDuration(uptime), + formatBytes(s.stats.TotalBytesUp), + formatBytes(s.stats.TotalBytesDown), + formatDuration(uptime), ) // Write stats to file if configured (copy data while locked, write async) @@ -359,8 +359,7 @@ func (s *Service) writeStatsToFile(statsJSON StatsJSON) { } } -// FormatDuration formats duration in a human-readable way -func FormatDuration(d time.Duration) string { +func formatDuration(d time.Duration) string { h := d / time.Hour m := (d % time.Hour) / time.Minute s := (d % time.Minute) / time.Second @@ -380,8 +379,7 @@ func (s *Service) GetStats() Stats { return *s.stats } -// FormatBytes formats bytes as a human-readable string -func FormatBytes(bytes int64) string { +func formatBytes(bytes int64) string { const unit = 1024 if bytes < unit { return fmt.Sprintf("%d B", bytes) From 898ac8577c50ee7c6376d36b3a087e2a66aebb67 Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 01:52:25 -0500 Subject: [PATCH 04/13] refactor(multi): improve output verbosity and fix regex performance - Move regex compilation to package level (fix N-counting perf issue) - Show per-instance stats only with -v flag - Properly propagate parent verbosity to child processes - Always show connection events and errors regardless of verbosity --- cli/internal/conduit/multi.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 61c62c28..339c2d3a 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -41,6 +41,14 @@ const ( ClientsPerInstance = 100 ) +// Compile regexes once at package initialization for performance +var ( + connectingRe = regexp.MustCompile(`Connecting:\s*(\d+)`) + connectedRe = regexp.MustCompile(`Connected:\s*(\d+)`) + upRe = regexp.MustCompile(`Up:\s*([\d.]+)\s*([KMGTPE]?B)`) + downRe = regexp.MustCompile(`Down:\s*([\d.]+)\s*([KMGTPE]?B)`) +) + // InstanceStats tracks stats for a single instance type InstanceStats struct { ID string @@ -190,10 +198,13 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, args = append(args, "-c", m.config.PsiphonConfigPath) } - // Pass through verbosity (children always need at least -v to output STATS) - if m.config.Verbosity >= 2 { - args = append(args, "-vv") - } else { + // Pass through verbosity from parent to children + // Children need at least -v to output [STATS] lines for parsing + childVerbosity := m.config.Verbosity + if childVerbosity < 1 { + childVerbosity = 1 // Minimum -v required for stats output + } + for i := 0; i < childVerbosity; i++ { args = append(args, "-v") } @@ -248,28 +259,30 @@ func (m *MultiService) parseInstanceOutput(idx int, line string) { stats := m.instanceStats[idx] + // Always show "Connected to Psiphon network" events (important milestone) if strings.Contains(line, "[OK] Connected to Psiphon network") { stats.IsLive = true fmt.Printf("[instance-%d] Connected to Psiphon network\n", idx) return } + // Parse stats lines for aggregation, but only print per-instance stats in verbose mode if strings.Contains(line, "[STATS]") { m.parseStatsLine(stats, line) + // Only show individual instance stats if verbose + if m.config.Verbosity >= 1 { + fmt.Printf("[instance-%d] %s\n", idx, line) + } return } + // All other output only shown in verbose mode if m.config.Verbosity >= 1 { fmt.Printf("[instance-%d] %s\n", idx, line) } } func (m *MultiService) parseStatsLine(stats *InstanceStats, line string) { - connectingRe := regexp.MustCompile(`Connecting:\s*(\d+)`) - connectedRe := regexp.MustCompile(`Connected:\s*(\d+)`) - upRe := regexp.MustCompile(`Up:\s*([\d.]+)\s*([KMGTPE]?B)`) - downRe := regexp.MustCompile(`Down:\s*([\d.]+)\s*([KMGTPE]?B)`) - if match := connectingRe.FindStringSubmatch(line); len(match) > 1 { if v, err := strconv.Atoi(match[1]); err == nil { stats.Connecting = v From 60eefb4a9062e3cc3bb73c8c0bb681ded40a8eff Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 02:03:39 -0500 Subject: [PATCH 05/13] refactor(multi): replace magic number and optimize byte parsing - Add BytesPerSecondToMbps constant (replaces hardcoded 125000) - Move byteMultipliers map to package level to avoid repeated allocation --- cli/internal/conduit/multi.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 339c2d3a..5ebb2b9e 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -39,6 +39,8 @@ import ( const ( // ClientsPerInstance is the target number of clients per instance ClientsPerInstance = 100 + // BytesPerSecondToMbps converts bytes per second to megabits per second + BytesPerSecondToMbps = 1000 * 1000 / 8 ) // Compile regexes once at package initialization for performance @@ -49,6 +51,17 @@ var ( downRe = regexp.MustCompile(`Down:\s*([\d.]+)\s*([KMGTPE]?B)`) ) +// Byte unit multipliers for parsing human-readable byte values +var byteMultipliers = map[string]float64{ + "B": 1, + "KB": 1024, + "MB": 1024 * 1024, + "GB": 1024 * 1024 * 1024, + "TB": 1024 * 1024 * 1024 * 1024, + "PB": 1024 * 1024 * 1024 * 1024 * 1024, + "EB": 1024 * 1024 * 1024 * 1024 * 1024 * 1024, +} + // InstanceStats tracks stats for a single instance type InstanceStats struct { ID string @@ -125,7 +138,7 @@ func (m *MultiService) Run(ctx context.Context) error { var bandwidthPerInstance float64 if m.config.BandwidthBytesPerSecond > 0 { bandwidthPerInstance = float64(m.config.BandwidthBytesPerSecond) / float64(m.numInstances) - bandwidthPerInstance = bandwidthPerInstance / 125000 // Convert to Mbps + bandwidthPerInstance = bandwidthPerInstance / BytesPerSecondToMbps // Convert to Mbps } else { bandwidthPerInstance = -1 } @@ -308,17 +321,7 @@ func parseByteValue(numStr, unit string) int64 { return 0 } - multipliers := map[string]float64{ - "B": 1, - "KB": 1024, - "MB": 1024 * 1024, - "GB": 1024 * 1024 * 1024, - "TB": 1024 * 1024 * 1024 * 1024, - "PB": 1024 * 1024 * 1024 * 1024 * 1024, - "EB": 1024 * 1024 * 1024 * 1024 * 1024 * 1024, - } - - if mult, ok := multipliers[unit]; ok { + if mult, ok := byteMultipliers[unit]; ok { return int64(val * mult) } return int64(val) From 5ea038fc89154852d7cd420637104230823da8aa Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 02:10:46 -0500 Subject: [PATCH 06/13] refactor(multi): release mutex before I/O to reduce contention --- cli/internal/conduit/multi.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 5ebb2b9e..d3b7b03f 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -348,8 +348,8 @@ func (m *MultiService) aggregateAndPrintStats(ctx context.Context) { // printAndWriteStats aggregates, prints, and optionally writes stats to file func (m *MultiService) printAndWriteStats() { + // Copy data under lock, then release before I/O m.mu.Lock() - defer m.mu.Unlock() var liveCount, totalConnecting, totalConnected int var totalUp, totalDown int64 @@ -375,7 +375,13 @@ func (m *MultiService) printAndWriteStats() { } uptime := time.Since(m.startTime).Truncate(time.Second) + statsFile := m.config.StatsFile + verbosity := m.config.Verbosity + + m.mu.Unlock() + // Lock released - safe to do I/O operations now + // Print aggregate stats to console fmt.Printf("%s [AGGREGATE] Live: %d/%d | Connecting: %d | Connected: %d | Up: %s | Down: %s | Uptime: %s\n", time.Now().Format("2006-01-02 15:04:05"), liveCount, @@ -387,7 +393,8 @@ func (m *MultiService) printAndWriteStats() { formatDuration(uptime), ) - if m.config.StatsFile != "" { + // Write stats to file if configured + if statsFile != "" { statsJSON := AggregateStatsJSON{ LiveInstances: liveCount, TotalInstances: m.numInstances, @@ -406,10 +413,10 @@ func (m *MultiService) printAndWriteStats() { return } - if err := os.WriteFile(m.config.StatsFile, data, 0644); err != nil { - fmt.Printf("[ERROR] Failed to write stats file %s: %v\n", m.config.StatsFile, err) - } else if m.config.Verbosity >= 2 { - fmt.Printf("[DEBUG] Wrote stats to %s\n", m.config.StatsFile) + if err := os.WriteFile(statsFile, data, 0644); err != nil { + fmt.Printf("[ERROR] Failed to write stats file %s: %v\n", statsFile, err) + } else if verbosity >= 2 { + fmt.Printf("[DEBUG] Wrote stats to %s\n", statsFile) } } } From bb5a6213a4ed8a1c7fcdd7c94c2985157fec600d Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 02:25:26 -0500 Subject: [PATCH 07/13] refactor(multi): simplify multi-instance flag and add validation --- cli/cmd/start.go | 13 ++++++++----- cli/internal/conduit/multi.go | 9 +++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cli/cmd/start.go b/cli/cmd/start.go index 1c66d591..0141d1c1 100644 --- a/cli/cmd/start.go +++ b/cli/cmd/start.go @@ -37,7 +37,6 @@ var ( bandwidthMbps float64 psiphonConfigPath string statsFilePath string - multiInstance bool numInstances int ) @@ -65,8 +64,8 @@ func init() { startCmd.Flags().Float64VarP(&bandwidthMbps, "bandwidth", "b", config.DefaultBandwidthMbps, "total bandwidth limit in Mbps (-1 for unlimited)") startCmd.Flags().StringVarP(&statsFilePath, "stats-file", "s", "", "persist stats to JSON file (-s for default, -s=path or --stats-file=path for custom)") startCmd.Flags().Lookup("stats-file").NoOptDefVal = "stats.json" - startCmd.Flags().BoolVar(&multiInstance, "multi-instance", false, "run multiple instances (1 per 100 max-clients)") - startCmd.Flags().IntVar(&numInstances, "instances", 0, "number of instances (0 = auto-calculate based on max-clients)") + startCmd.Flags().IntVar(&numInstances, "multi-instance", 0, "run multiple instances: 0=single (default), N=N instances, or no value=auto (1 per 50 clients, max 32)") + startCmd.Flags().Lookup("multi-instance").NoOptDefVal = "-1" // -1 means auto-calculate // Only show --psiphon-config flag if no config is embedded if !config.HasEmbeddedConfig() { @@ -127,11 +126,15 @@ func runStart(cmd *cobra.Command, args []string) error { }() // Multi-instance mode: spawn subprocesses - if multiInstance || numInstances > 0 { + if numInstances != 0 { instances := numInstances - if instances <= 0 { + if instances < 0 { instances = conduit.CalculateInstances(maxClients) } + if instances > conduit.MaxInstances { + return fmt.Errorf("too many instances: %d (maximum: %d)", instances, conduit.MaxInstances) + } + multiService, err := conduit.NewMultiService(cfg, instances) if err != nil { return fmt.Errorf("failed to create multi-instance service: %w", err) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index d3b7b03f..7597e67c 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -38,7 +38,9 @@ import ( const ( // ClientsPerInstance is the target number of clients per instance - ClientsPerInstance = 100 + ClientsPerInstance = 50 + // MaxInstances is the maximum number of instances allowed + MaxInstances = 32 // BytesPerSecondToMbps converts bytes per second to megabits per second BytesPerSecondToMbps = 1000 * 1000 / 8 ) @@ -444,9 +446,8 @@ func CalculateInstances(maxClients int) int { if instances < 1 { instances = 1 } - // Cap at reasonable maximum - if instances > 10 { - instances = 10 + if instances > MaxInstances { + instances = MaxInstances } return instances } From b3d7fed292292242f66dcd7076e7ec84ee7fdb5b Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 02:41:50 -0500 Subject: [PATCH 08/13] fix(multi): remove forced minimum verbosity for child processes --- cli/internal/conduit/multi.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 7597e67c..bb9fa4c3 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -214,12 +214,7 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, } // Pass through verbosity from parent to children - // Children need at least -v to output [STATS] lines for parsing - childVerbosity := m.config.Verbosity - if childVerbosity < 1 { - childVerbosity = 1 // Minimum -v required for stats output - } - for i := 0; i < childVerbosity; i++ { + for i := 0; i < m.config.Verbosity; i++ { args = append(args, "-v") } From f063cc7c65a561a2c232a38749eb42204103e6c7 Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 03:00:27 -0500 Subject: [PATCH 09/13] feat(multi): add process monitoring with restart and idle timeout --- cli/internal/conduit/multi.go | 107 ++++++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 23 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index bb9fa4c3..982c8956 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -43,6 +43,12 @@ const ( MaxInstances = 32 // BytesPerSecondToMbps converts bytes per second to megabits per second BytesPerSecondToMbps = 1000 * 1000 / 8 + // MaxRestarts is the maximum number of times an instance can restart + MaxRestarts = 5 + // RestartBackoff is the delay between restart attempts + RestartBackoff = 5 * time.Second + // IdleTimeout is how long an instance can be idle before automatic restart + IdleTimeout = 1 * time.Hour ) // Compile regexes once at package initialization for performance @@ -66,12 +72,14 @@ var byteMultipliers = map[string]float64{ // InstanceStats tracks stats for a single instance type InstanceStats struct { - ID string - IsLive bool - Connecting int - Connected int - BytesUp int64 - BytesDown int64 + ID string + IsLive bool + Connecting int + Connected int + BytesUp int64 + BytesDown int64 + RestartCount int // Number of times this instance has been restarted + LastZeroTime time.Time // Last time Connected was 0 (for idle timeout detection) } // MultiService manages multiple conduit subprocess instances @@ -94,6 +102,7 @@ type AggregateStatsJSON struct { ConnectedClients int `json:"connectedClients"` TotalBytesUp int64 `json:"totalBytesUp"` TotalBytesDown int64 `json:"totalBytesDown"` + TotalRestarts int `json:"totalRestarts"` UptimeSeconds int64 `json:"uptimeSeconds"` Timestamp string `json:"timestamp"` Instances []InstanceJSON `json:"instances,omitempty"` @@ -101,12 +110,13 @@ type AggregateStatsJSON struct { // InstanceJSON represents per-instance stats in JSON type InstanceJSON struct { - ID string `json:"id"` - IsLive bool `json:"isLive"` - Connecting int `json:"connecting"` - Connected int `json:"connected"` - BytesUp int64 `json:"bytesUp"` - BytesDown int64 `json:"bytesDown"` + ID string `json:"id"` + IsLive bool `json:"isLive"` + Connecting int `json:"connecting"` + Connected int `json:"connected"` + BytesUp int64 `json:"bytesUp"` + BytesDown int64 `json:"bytesDown"` + RestartCount int `json:"restartCount"` } // NewMultiService creates a multi-instance service that spawns subprocesses @@ -165,10 +175,37 @@ func (m *MultiService) Run(ctx context.Context) error { wg.Add(1) go func(idx int, dataDir string) { defer wg.Done() - if err := m.runInstance(ctx, idx, dataDir, clientsPerInstance, bandwidthPerInstance); err != nil { - if ctx.Err() == nil { - errChan <- fmt.Errorf("instance-%d: %w", idx, err) + restartCount := 0 + + for { + err := m.runInstance(ctx, idx, dataDir, clientsPerInstance, bandwidthPerInstance) + + // Check if this was a clean shutdown (context cancelled) + if ctx.Err() != nil { + return + } + + // Instance crashed unexpectedly + restartCount++ + + // Update restart count in stats + m.mu.Lock() + m.instanceStats[idx].RestartCount = restartCount + m.instanceStats[idx].IsLive = false + m.mu.Unlock() + + if restartCount >= MaxRestarts { + fmt.Printf("[instance-%d] Reached max restarts (%d), giving up\n", idx, MaxRestarts) + if err != nil { + errChan <- fmt.Errorf("instance-%d exceeded max restarts: %w", idx, err) + } + return } + + fmt.Printf("[instance-%d] Crashed (restart %d/%d), restarting in %v...\n", + idx, restartCount, MaxRestarts, RestartBackoff) + + time.Sleep(RestartBackoff) } }(i, instanceDataDir) @@ -348,7 +385,7 @@ func (m *MultiService) printAndWriteStats() { // Copy data under lock, then release before I/O m.mu.Lock() - var liveCount, totalConnecting, totalConnected int + var liveCount, totalConnecting, totalConnected, totalRestarts int var totalUp, totalDown int64 instances := make([]InstanceJSON, m.numInstances) @@ -360,14 +397,32 @@ func (m *MultiService) printAndWriteStats() { totalConnected += stats.Connected totalUp += stats.BytesUp totalDown += stats.BytesDown + totalRestarts += stats.RestartCount instances[i] = InstanceJSON{ - ID: stats.ID, - IsLive: stats.IsLive, - Connecting: stats.Connecting, - Connected: stats.Connected, - BytesUp: stats.BytesUp, - BytesDown: stats.BytesDown, + ID: stats.ID, + IsLive: stats.IsLive, + Connecting: stats.Connecting, + Connected: stats.Connected, + BytesUp: stats.BytesUp, + BytesDown: stats.BytesDown, + RestartCount: stats.RestartCount, + } + + // Check for idle timeout: if instance has been at 0 connections for > 1 hour, restart it + if stats.IsLive && stats.Connected == 0 { + if stats.LastZeroTime.IsZero() { + stats.LastZeroTime = time.Now() + } else if time.Since(stats.LastZeroTime) > IdleTimeout { + fmt.Printf("[instance-%d] Idle for %v with no connections, restarting...\n", + i, time.Since(stats.LastZeroTime).Truncate(time.Second)) + if m.processes[i] != nil { + m.processes[i].Process.Kill() + } + stats.LastZeroTime = time.Time{} // Reset timer + } + } else if stats.Connected > 0 { + stats.LastZeroTime = time.Time{} } } @@ -379,7 +434,11 @@ func (m *MultiService) printAndWriteStats() { // Lock released - safe to do I/O operations now // Print aggregate stats to console - fmt.Printf("%s [AGGREGATE] Live: %d/%d | Connecting: %d | Connected: %d | Up: %s | Down: %s | Uptime: %s\n", + restartInfo := "" + if totalRestarts > 0 { + restartInfo = fmt.Sprintf(" | Restarts: %d", totalRestarts) + } + fmt.Printf("[AGGREGATE] %s Live: %d/%d | Connecting: %d | Connected: %d | Up: %s | Down: %s | Uptime: %s%s\n", time.Now().Format("2006-01-02 15:04:05"), liveCount, m.numInstances, @@ -388,6 +447,7 @@ func (m *MultiService) printAndWriteStats() { formatBytes(totalUp), formatBytes(totalDown), formatDuration(uptime), + restartInfo, ) // Write stats to file if configured @@ -399,6 +459,7 @@ func (m *MultiService) printAndWriteStats() { ConnectedClients: totalConnected, TotalBytesUp: totalUp, TotalBytesDown: totalDown, + TotalRestarts: totalRestarts, UptimeSeconds: int64(uptime.Seconds()), Timestamp: time.Now().Format(time.RFC3339), Instances: instances, From 2a7163e0e6dccb01e9c0cbc452086b5088de0203 Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 03:33:25 -0500 Subject: [PATCH 10/13] refactor(multi): improve process lifecycle management and I/O handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improves child process management with graceful shutdown, proper I/O handling, and better concurrency patterns. Changes: 1. Graceful Shutdown with Timeout (Two-Phase Pattern): - Use CommandContext to send SIGTERM when parent context cancelled - Child process receives signal and can cleanup gracefully - Force-kill with SIGKILL after 2s timeout if child still running - Prevents abrupt termination and allows connection cleanup 2. WaitGroup for I/O Goroutines: - Promote WaitGroup from local variable to struct field - Track stderr/stdout reader goroutines with Add/Done - Parent waits for all I/O to complete before exiting - Prevents truncated output during shutdown 3. Increased Scanner Buffer Size: - Create newLargeBufferScanner() helper function - 64KB initial buffer, 1MB maximum (vs 64KB default limit) - Prevents scanner failure on long lines (stack traces, verbose JSON) - Eliminates duplicate buffer configuration code (DRY) 4. Proper Stream Separation: - Write child stderr to os.Stderr (was os.Stdout) - Write child stdout to os.Stdout (unchanged) - Follows Unix convention: stderr=diagnostics, stdout=data - Enables separate redirection: conduit 2>errors.log 1>output.log 5. Concurrent I/O Reading: - Move stdout reading to goroutine (was blocking in main flow) - Both stderr and stdout now read concurrently - Prevents potential deadlock if child writes to both streams - Symmetric design improves code clarity Shutdown Flow: User signals shutdown (Ctrl+C) ↓ Context cancelled → CommandContext sends SIGTERM to child ↓ Child receives signal → starts graceful cleanup ↓ Monitor goroutine waits 2 seconds ↓ If child still alive → Force kill (SIGKILL) ↓ cmd.Wait() returns → I/O goroutines finish reading ↓ m.wg.Wait() blocks until all I/O complete ↓ Parent exits cleanly Constants Added: - ShutdownTimeout = 2s (grace period before force-kill) --- cli/internal/conduit/multi.go | 58 ++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 982c8956..07c5abb5 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -24,6 +24,7 @@ import ( "context" "encoding/json" "fmt" + "io" "os" "os/exec" "path/filepath" @@ -49,6 +50,8 @@ const ( RestartBackoff = 5 * time.Second // IdleTimeout is how long an instance can be idle before automatic restart IdleTimeout = 1 * time.Hour + // ShutdownTimeout is the grace period before force-killing child processes + ShutdownTimeout = 2 * time.Second ) // Compile regexes once at package initialization for performance @@ -90,6 +93,7 @@ type MultiService struct { instanceStats []*InstanceStats cancel context.CancelFunc mu sync.Mutex + wg sync.WaitGroup // Tracks all goroutines (instance restarts + I/O readers) startTime time.Time statsDone chan struct{} } @@ -162,7 +166,6 @@ func (m *MultiService) Run(ctx context.Context) error { fmt.Printf("Starting %d Psiphon Conduit instances (Max Clients/instance: %d, Bandwidth: %s)\n", m.numInstances, clientsPerInstance, bandwidthStr) - var wg sync.WaitGroup errChan := make(chan error, m.numInstances) for i := 0; i < m.numInstances; i++ { @@ -172,9 +175,9 @@ func (m *MultiService) Run(ctx context.Context) error { return fmt.Errorf("failed to create instance directory: %w", err) } - wg.Add(1) + m.wg.Add(1) go func(idx int, dataDir string) { - defer wg.Done() + defer m.wg.Done() restartCount := 0 for { @@ -214,7 +217,7 @@ func (m *MultiService) Run(ctx context.Context) error { go m.aggregateAndPrintStats(ctx) - wg.Wait() + m.wg.Wait() // Cancel context to trigger final stats write m.cancel() @@ -283,19 +286,43 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, m.processes[idx] = cmd m.mu.Unlock() + // Monitor context cancellation for graceful shutdown with timeout + // CommandContext will signal the child when ctx is cancelled, but we also + // force-kill after ShutdownTimeout if it hasn't exited yet go func() { - scanner := bufio.NewScanner(stderr) + <-ctx.Done() + // Give process time to exit gracefully after receiving signal + time.Sleep(ShutdownTimeout) + if cmd.ProcessState == nil || !cmd.ProcessState.Exited() { + // Force kill if still running after grace period + if cmd.Process != nil { + cmd.Process.Kill() + } + } + }() + + // Stream stderr with prefix in background + m.wg.Add(1) + go func() { + defer m.wg.Done() + scanner := newLargeBufferScanner(stderr) for scanner.Scan() { - fmt.Printf("[instance-%d] %s\n", idx, scanner.Text()) + fmt.Fprintf(os.Stderr, "[instance-%d] %s\n", idx, scanner.Text()) } }() - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - line := scanner.Text() - m.parseInstanceOutput(idx, line) - } + // Stream stdout and parse for stats + m.wg.Add(1) + go func() { + defer m.wg.Done() + scanner := newLargeBufferScanner(stdout) + for scanner.Scan() { + line := scanner.Text() + m.parseInstanceOutput(idx, line) + } + }() + // Wait for process to exit return cmd.Wait() } @@ -496,6 +523,15 @@ func (m *MultiService) Stop() { } } +// newLargeBufferScanner creates a scanner with increased buffer size to handle long lines +func newLargeBufferScanner(r io.Reader) *bufio.Scanner { + scanner := bufio.NewScanner(r) + // Increase buffer size to handle long lines (up to 1MB) + buf := make([]byte, 64*1024) + scanner.Buffer(buf, 1024*1024) + return scanner +} + // CalculateInstances determines how many instances to run based on max clients func CalculateInstances(maxClients int) int { instances := maxClients / ClientsPerInstance From 453e1e021c1a0feedf4d656c9674b60be7f3e3ec Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 04:02:12 -0500 Subject: [PATCH 11/13] refactor(multi): change stats aggregation from periodic to event-driven Replace time-based periodic printing (every 10s) with event-driven printing that only outputs when stats actually change. Eliminates spam during idle periods and provides immediate feedback when stats update. --- cli/internal/conduit/multi.go | 41 +++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 07c5abb5..8270f507 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -96,6 +96,7 @@ type MultiService struct { wg sync.WaitGroup // Tracks all goroutines (instance restarts + I/O readers) startTime time.Time statsDone chan struct{} + statsChanged chan struct{} // Signals when stats have changed } // AggregateStatsJSON represents the JSON structure for multi-instance stats @@ -139,6 +140,7 @@ func NewMultiService(cfg *config.Config, numInstances int) (*MultiService, error instanceStats: instanceStats, startTime: time.Now(), statsDone: make(chan struct{}), + statsChanged: make(chan struct{}, 100), // Buffered to avoid blocking }, nil } @@ -357,21 +359,45 @@ func (m *MultiService) parseInstanceOutput(idx int, line string) { } func (m *MultiService) parseStatsLine(stats *InstanceStats, line string) { + changed := false + if match := connectingRe.FindStringSubmatch(line); len(match) > 1 { if v, err := strconv.Atoi(match[1]); err == nil { - stats.Connecting = v + if stats.Connecting != v { + stats.Connecting = v + changed = true + } } } if match := connectedRe.FindStringSubmatch(line); len(match) > 1 { if v, err := strconv.Atoi(match[1]); err == nil { - stats.Connected = v + if stats.Connected != v { + stats.Connected = v + changed = true + } } } if match := upRe.FindStringSubmatch(line); len(match) > 2 { - stats.BytesUp = parseByteValue(match[1], match[2]) + newVal := parseByteValue(match[1], match[2]) + if stats.BytesUp != newVal { + stats.BytesUp = newVal + changed = true + } } if match := downRe.FindStringSubmatch(line); len(match) > 2 { - stats.BytesDown = parseByteValue(match[1], match[2]) + newVal := parseByteValue(match[1], match[2]) + if stats.BytesDown != newVal { + stats.BytesDown = newVal + changed = true + } + } + + if changed { + select { + case m.statsChanged <- struct{}{}: + default: + // Channel full, skip this signal (next change will trigger anyway) + } } } @@ -388,20 +414,17 @@ func parseByteValue(numStr, unit string) int64 { return int64(val) } -// aggregateAndPrintStats periodically prints combined stats from all instances +// aggregateAndPrintStats prints combined stats when changes occur func (m *MultiService) aggregateAndPrintStats(ctx context.Context) { defer close(m.statsDone) - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for { select { case <-ctx.Done(): // Final stats write on shutdown m.printAndWriteStats() return - case <-ticker.C: + case <-m.statsChanged: m.printAndWriteStats() } } From 5dd3e89f2fdeba6417d0d11fded13893b8dbe76f Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 04:11:09 -0500 Subject: [PATCH 12/13] refactor(multi): simplify instance directory naming to numeric only --- cli/internal/conduit/multi.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index 8270f507..ef44c419 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -171,7 +171,7 @@ func (m *MultiService) Run(ctx context.Context) error { errChan := make(chan error, m.numInstances) for i := 0; i < m.numInstances; i++ { - instanceDataDir := filepath.Join(m.config.DataDir, fmt.Sprintf("instance-%d", i)) + instanceDataDir := filepath.Join(m.config.DataDir, fmt.Sprintf("%d", i)) if err := os.MkdirAll(instanceDataDir, 0700); err != nil { return fmt.Errorf("failed to create instance directory: %w", err) From 61ba7d9b2f032072892be6dc94f5883703498d80 Mon Sep 17 00:00:00 2001 From: Samim Mirhosseini Date: Wed, 28 Jan 2026 15:42:58 -0500 Subject: [PATCH 13/13] fix(multi): address code review feedback - race condition and code quality MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit addresses critical feedback from code review: 1. Simplify clientsPerInstance calculation using max() - Replace 4-line if-statement with idiomatic max() builtin - Ensures minimum of 1 client per instance more concisely 2. Add missing scanner error handling - Check scanner.Err() after both stdout and stderr scan loops - Prevents silent failures on buffer overflow or I/O errors - Follows Go scanner best practices 3. Fix mutex/channel race condition in stats processing - CRITICAL: Eliminates lock contention under heavy load - Root cause: parseInstanceOutput() held m.mu while signaling m.statsChanged, causing aggregateAndPrintStats() to block waiting for the same mutex - Solution: Release mutex BEFORE signaling channel - Changed parseStatsLine() to return bool instead of signaling - Moved channel signaling to parseInstanceOutput() after unlock Race condition flow: Before: parseInstanceOutput() [HOLDS m.mu] → parseStatsLine() signals m.statsChanged → aggregateAndPrintStats() wakes, blocks on m.mu → Lock contention cascade under load After: parseInstanceOutput() [HOLDS m.mu] → parseStatsLine() returns changed flag → m.mu.Unlock() [RELEASES LOCK] → Signal m.statsChanged (no lock held) → aggregateAndPrintStats() acquires m.mu immediately → No contention This follows the critical concurrency pattern: separate data access (under mutex) from coordination (channel signaling) to prevent cascading lock contention in high-throughput scenarios. --- cli/internal/conduit/multi.go | 48 ++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/cli/internal/conduit/multi.go b/cli/internal/conduit/multi.go index ef44c419..0b78c77b 100644 --- a/cli/internal/conduit/multi.go +++ b/cli/internal/conduit/multi.go @@ -148,10 +148,7 @@ func NewMultiService(cfg *config.Config, numInstances int) (*MultiService, error func (m *MultiService) Run(ctx context.Context) error { ctx, m.cancel = context.WithCancel(ctx) - clientsPerInstance := m.config.MaxClients / m.numInstances - if clientsPerInstance < 1 { - clientsPerInstance = 1 - } + clientsPerInstance := max(m.config.MaxClients/m.numInstances, 1) var bandwidthPerInstance float64 if m.config.BandwidthBytesPerSecond > 0 { @@ -311,6 +308,9 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, for scanner.Scan() { fmt.Fprintf(os.Stderr, "[instance-%d] %s\n", idx, scanner.Text()) } + if err := scanner.Err(); err != nil { + fmt.Fprintf(os.Stderr, "[instance-%d] %v\n", idx, err) + } }() // Stream stdout and parse for stats @@ -322,6 +322,9 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, line := scanner.Text() m.parseInstanceOutput(idx, line) } + if err := scanner.Err(); err != nil { + fmt.Fprintf(os.Stderr, "[instance-%d] %v\n", idx, err) + } }() // Wait for process to exit @@ -330,35 +333,46 @@ func (m *MultiService) runInstance(ctx context.Context, idx int, dataDir string, // parseInstanceOutput processes output from a subprocess instance func (m *MultiService) parseInstanceOutput(idx int, line string) { - m.mu.Lock() - defer m.mu.Unlock() + var changed bool + m.mu.Lock() stats := m.instanceStats[idx] // Always show "Connected to Psiphon network" events (important milestone) if strings.Contains(line, "[OK] Connected to Psiphon network") { stats.IsLive = true fmt.Printf("[instance-%d] Connected to Psiphon network\n", idx) + m.mu.Unlock() return } // Parse stats lines for aggregation, but only print per-instance stats in verbose mode if strings.Contains(line, "[STATS]") { - m.parseStatsLine(stats, line) + changed = m.parseStatsLine(stats, line) // Only show individual instance stats if verbose if m.config.Verbosity >= 1 { fmt.Printf("[instance-%d] %s\n", idx, line) } - return - } - // All other output only shown in verbose mode - if m.config.Verbosity >= 1 { - fmt.Printf("[instance-%d] %s\n", idx, line) + m.mu.Unlock() // unlock before sending the signal to statsChanged + + if changed { + select { + case m.statsChanged <- struct{}{}: + default: + } + } + } else { + // All other output only shown in verbose mode + if m.config.Verbosity >= 1 { + fmt.Printf("[instance-%d] %s\n", idx, line) + } + + m.mu.Unlock() } } -func (m *MultiService) parseStatsLine(stats *InstanceStats, line string) { +func (m *MultiService) parseStatsLine(stats *InstanceStats, line string) bool { changed := false if match := connectingRe.FindStringSubmatch(line); len(match) > 1 { @@ -392,13 +406,7 @@ func (m *MultiService) parseStatsLine(stats *InstanceStats, line string) { } } - if changed { - select { - case m.statsChanged <- struct{}{}: - default: - // Channel full, skip this signal (next change will trigger anyway) - } - } + return changed } // parseByteValue converts a human-readable byte string to int64