From 7bdc066eea1750ff662e93a8fe0c8a04b06f19d8 Mon Sep 17 00:00:00 2001 From: David Dollar Date: Fri, 2 Aug 2024 02:03:18 -0400 Subject: [PATCH 1/6] spike out interactive mode --- lib/foreman/buffer.rb | 34 ++++++++++++++++++ lib/foreman/cli.rb | 13 +++---- lib/foreman/engine.rb | 74 +++++++++++++++++++++++++++++++-------- lib/foreman/engine/cli.rb | 23 ++++++++---- lib/foreman/process.rb | 3 ++ 5 files changed, 120 insertions(+), 27 deletions(-) create mode 100644 lib/foreman/buffer.rb diff --git a/lib/foreman/buffer.rb b/lib/foreman/buffer.rb new file mode 100644 index 00000000..b34f498d --- /dev/null +++ b/lib/foreman/buffer.rb @@ -0,0 +1,34 @@ +ANSI_TOKEN = /\e\[(?:\??\d{1,4}(?:;\d{0,4})*)?[A-Za-z]/ +NEWLINE_TOKEN = /\n/ +TOKENIZER = Regexp.new("(#{ANSI_TOKEN}|#{NEWLINE_TOKEN})") + +class Buffer + @buffer = '' + + def initialize(initial = '') + @buffer = initial + end + + def each_token + remainder = '' + @buffer.split(TOKENIZER).each do |token| + if token.include?("\e") && !token.match(ANSI_TOKEN) + remainder << token + else + yield token unless token.empty? + end + end + @buffer = remainder + end + + def gets + return nil unless @buffer.include?("\n") + + line, @buffer = @buffer.split("\n", 2) + line + end + + def write(data) + @buffer << data + end +end diff --git a/lib/foreman/cli.rb b/lib/foreman/cli.rb index f0affefb..91bda63c 100644 --- a/lib/foreman/cli.rb +++ b/lib/foreman/cli.rb @@ -19,12 +19,13 @@ class Foreman::CLI < Foreman::Thor desc "start [PROCESS]", "Start the application (or a specific PROCESS)" - method_option :color, :type => :boolean, :aliases => "-c", :desc => "Force color to be enabled" - method_option :env, :type => :string, :aliases => "-e", :desc => "Specify an environment file to load, defaults to .env" - method_option :formation, :type => :string, :aliases => "-m", :banner => '"alpha=5,bar=3"', :desc => 'Specify what processes will run and how many. Default: "all=1"' - method_option :port, :type => :numeric, :aliases => "-p" - method_option :timeout, :type => :numeric, :aliases => "-t", :desc => "Specify the amount of time (in seconds) processes have to shutdown gracefully before receiving a SIGKILL, defaults to 5." - method_option :timestamp, :type => :boolean, :default => true, :desc => "Include timestamp in output" + method_option :color, :type => :boolean, :aliases => "-c", :desc => "Force color to be enabled" + method_option :env, :type => :string, :aliases => "-e", :desc => "Specify an environment file to load, defaults to .env" + method_option :formation, :type => :string, :aliases => "-m", :banner => '"alpha=5,bar=3"', :desc => 'Specify what processes will run and how many. Default: "all=1"' + method_option :interactive, :type => :string, :aliases => "-i", :desc => "Run a process interactively" + method_option :port, :type => :numeric, :aliases => "-p" + method_option :timeout, :type => :numeric, :aliases => "-t", :desc => "Specify the amount of time (in seconds) processes have to shutdown gracefully before receiving a SIGKILL, defaults to 5." + method_option :timestamp, :type => :boolean, :default => false, :desc => "Include timestamp in output" class << self # Hackery. Take the run method away from Thor so that we can redefine it. diff --git a/lib/foreman/engine.rb b/lib/foreman/engine.rb index a1316593..db47da83 100644 --- a/lib/foreman/engine.rb +++ b/lib/foreman/engine.rb @@ -1,4 +1,5 @@ require "foreman" +require "foreman/buffer" require "foreman/env" require "foreman/process" require "foreman/procfile" @@ -30,6 +31,7 @@ def initialize(options={}) @options[:formation] ||= "all=1" @options[:timeout] ||= 5 + @buffers = {} @env = {} @mutex = Mutex.new @names = {} @@ -148,6 +150,7 @@ def handle_signal_forward(signal) def register(name, command, options={}) options[:env] ||= env options[:cwd] ||= File.dirname(command.split(" ").first) + options[:interactive] ||= @options[:interactive] == name process = Foreman::Process.new(command, options) @names[process] = name @processes << process @@ -320,6 +323,10 @@ def name_for_index(process, index) [ @names[process], index.to_s ].compact.join(".") end + def process_for(reader) + @running[@readers.invert[reader]].first + end + def parse_formation(formation) pairs = formation.to_s.gsub(/\s/, "").split(",") @@ -350,13 +357,6 @@ def termination_message_for(status) end end - def flush_reader(reader) - until reader.eof? - data = reader.gets - output_with_mutex name_for(@readers.key(reader)), data - end - end - ## Engine ########################################################### def spawn_processes @@ -364,14 +364,19 @@ def spawn_processes 1.upto(formation[@names[process]]) do |n| reader, writer = create_pipe begin - pid = process.run(:output => writer, :env => { - "PORT" => port_for(process, n).to_s, - "PS" => name_for_index(process, n) - }) + pid = process.run( + input: process.interactive? ? $stdin : :close, + output: writer, + env: { + 'PORT' => port_for(process, n).to_s, + 'PS' => name_for_index(process, n) + } + ) writer.puts "started with pid #{pid}" rescue Errno::ENOENT writer.puts "unknown command: #{process.command}" end + @buffers[reader] = Buffer.new @running[pid] = [process, n] @readers[pid] = reader end @@ -395,11 +400,52 @@ def handle_io(readers) next if reader == @selfpipe[:reader] if reader.eof? - @readers.delete_if { |key, value| value == reader } + @buffers.delete(reader) + @readers.delete_if { |_key, value| value == reader } + elsif process_for(reader).interactive? + handle_io_interactive reader else - data = reader.gets - output_with_mutex name_for(@readers.invert[reader]), data + handle_io_noninteractive reader + end + end + end + + def handle_io_interactive(reader) + done = false + name = name_for(@readers.invert[reader]) + + output_partial prefix(name) + + loop do + @buffers[reader].write(reader.read_nonblock(10)) + + @buffers[reader].each_token do |token| + case token + when /^\e\[(\d+)G$/ + output_partial "\e[#{::Regexp.last_match(1).to_i + prefix(name).gsub(ANSI_TOKEN, "").length}G" + when ANSI_TOKEN + output_partial token + when "\n" + output_partial token + output_partial prefix(name) + else + output_partial token + end + done = (token == "\n") end + rescue IO::WaitReadable + retry if IO.select([reader], [], [], 1) + return if done + rescue EOFError + end + ensure + output_partial "\n" + end + + def handle_io_noninteractive(reader) + @buffers[reader].write(reader.read_nonblock(10)) + while line = @buffers[reader].gets + output_with_mutex name_for(@readers.invert[reader]), line end end diff --git a/lib/foreman/engine/cli.rb b/lib/foreman/engine/cli.rb index 1cbc66f4..8a9476ce 100644 --- a/lib/foreman/engine/cli.rb +++ b/lib/foreman/engine/cli.rb @@ -55,19 +55,28 @@ def startup def output(name, data) data.to_s.lines.map(&:chomp).each do |message| - output = "" - output += $stdout.color(@colors[name.split(".").first].to_sym) - output += "#{Time.now.strftime("%H:%M:%S")} " if options[:timestamp] - output += "#{pad_process_name(name)} | " - output += $stdout.color(:reset) - output += message - $stdout.puts output + $stdout.write prefix(name) + $stdout.puts message $stdout.flush end rescue Errno::EPIPE terminate_gracefully end + def output_partial(data) + $stdout.write data + $stdout.flush + end + + def prefix(name) + output = '' + output += $stdout.color(@colors[name.split('.').first].to_sym) + output += "#{Time.now.strftime('%H:%M:%S')} " if options[:timestamp] + output += "#{pad_process_name(name)} | " + output += $stdout.color(:reset) + output + end + def shutdown end diff --git a/lib/foreman/process.rb b/lib/foreman/process.rb index ee3de948..f5d23d91 100644 --- a/lib/foreman/process.rb +++ b/lib/foreman/process.rb @@ -77,4 +77,7 @@ def cwd File.expand_path(@options[:cwd] || ".") end + def interactive? + @options[:interactive] + end end From 50ae7e8bd94f16f733875bda98c7ff25d8658318 Mon Sep 17 00:00:00 2001 From: David Dollar Date: Fri, 2 Aug 2024 15:15:44 -0400 Subject: [PATCH 2/6] use pty to handle interactive terminal --- lib/foreman/engine.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/foreman/engine.rb b/lib/foreman/engine.rb index db47da83..360a547b 100644 --- a/lib/foreman/engine.rb +++ b/lib/foreman/engine.rb @@ -3,6 +3,7 @@ require "foreman/env" require "foreman/process" require "foreman/procfile" +require "pty" require "tempfile" require "fileutils" require "thread" @@ -362,7 +363,7 @@ def termination_message_for(status) def spawn_processes @processes.each do |process| 1.upto(formation[@names[process]]) do |n| - reader, writer = create_pipe + reader, writer = process.interactive? ? PTY.open : create_pipe begin pid = process.run( input: process.interactive? ? $stdin : :close, From 311f023261ceecf3e483932bb968f4aa22e480dd Mon Sep 17 00:00:00 2001 From: Jerome Dalbert Date: Fri, 2 Aug 2024 13:03:57 -0700 Subject: [PATCH 3/6] allow specifying input stream to Process#run --- lib/foreman/process.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/foreman/process.rb b/lib/foreman/process.rb index f5d23d91..c00849f8 100644 --- a/lib/foreman/process.rb +++ b/lib/foreman/process.rb @@ -47,11 +47,12 @@ def expanded_command(custom_env={}) # def run(options={}) env = @options[:env].merge(options[:env] || {}) + input = options[:input] || $stdin output = options[:output] || $stdout runner = "#{Foreman.runner}".shellescape - + Dir.chdir(cwd) do - Process.spawn env, expanded_command(env), :out => output, :err => output + Process.spawn env, expanded_command(env), :in => input, :out => output, :err => output end end From 43d1a5103e38037dd305b3f465e9cd4c65fecd58 Mon Sep 17 00:00:00 2001 From: David Dollar Date: Sat, 3 Aug 2024 19:31:17 -0400 Subject: [PATCH 4/6] better handling of prefix injection --- lib/foreman/engine.rb | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/lib/foreman/engine.rb b/lib/foreman/engine.rb index 360a547b..9de49345 100644 --- a/lib/foreman/engine.rb +++ b/lib/foreman/engine.rb @@ -36,6 +36,7 @@ def initialize(options={}) @env = {} @mutex = Mutex.new @names = {} + @prefixed = {} @processes = [] @running = {} @readers = {} @@ -378,6 +379,7 @@ def spawn_processes writer.puts "unknown command: #{process.command}" end @buffers[reader] = Buffer.new + @prefixed[reader] = false @running[pid] = [process, n] @readers[pid] = reader end @@ -414,8 +416,7 @@ def handle_io(readers) def handle_io_interactive(reader) done = false name = name_for(@readers.invert[reader]) - - output_partial prefix(name) + indent = prefix(name).gsub(ANSI_TOKEN, "").length loop do @buffers[reader].write(reader.read_nonblock(10)) @@ -423,13 +424,18 @@ def handle_io_interactive(reader) @buffers[reader].each_token do |token| case token when /^\e\[(\d+)G$/ - output_partial "\e[#{::Regexp.last_match(1).to_i + prefix(name).gsub(ANSI_TOKEN, "").length}G" + output_partial "\e[#{Regexp.last_match(1).to_i + indent}G" when ANSI_TOKEN output_partial token when "\n" output_partial token - output_partial prefix(name) + @prefixed[reader] = false else + unless @prefixed[reader] + output_partial "\e[1G" + output_partial prefix(name) + @prefixed[reader] = true + end output_partial token end done = (token == "\n") @@ -450,6 +456,11 @@ def handle_io_noninteractive(reader) end end + def output_prefix(reader) + output_partial prefix(name_for(@readers.invert[reader])) + @prefixed[reader] = true + end + def watch_for_output Thread.new do begin From 9ee0820be0a059b6d7ad64a5a23d5b9953a234a1 Mon Sep 17 00:00:00 2001 From: David Dollar Date: Sun, 4 Aug 2024 00:02:07 -0400 Subject: [PATCH 5/6] use normal buffer sizes now that we're sure partial reads are working --- lib/foreman/engine.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/foreman/engine.rb b/lib/foreman/engine.rb index 9de49345..0019e354 100644 --- a/lib/foreman/engine.rb +++ b/lib/foreman/engine.rb @@ -419,7 +419,7 @@ def handle_io_interactive(reader) indent = prefix(name).gsub(ANSI_TOKEN, "").length loop do - @buffers[reader].write(reader.read_nonblock(10)) + @buffers[reader].write(reader.read_nonblock(4096)) @buffers[reader].each_token do |token| case token @@ -450,7 +450,7 @@ def handle_io_interactive(reader) end def handle_io_noninteractive(reader) - @buffers[reader].write(reader.read_nonblock(10)) + @buffers[reader].write(reader.read_nonblock(4096)) while line = @buffers[reader].gets output_with_mutex name_for(@readers.invert[reader]), line end From 2a0077529dc8f5dcf847bb135f7508334a451eec Mon Sep 17 00:00:00 2001 From: David Dollar Date: Tue, 6 Aug 2024 18:37:42 -0400 Subject: [PATCH 6/6] handle more raw terminal scenarios --- lib/foreman/buffer.rb | 8 ++++++-- lib/foreman/engine.rb | 25 +++++++++++-------------- lib/foreman/engine/cli.rb | 4 +++- lib/foreman/process.rb | 34 ++++++++++++++++++++++++++++------ 4 files changed, 48 insertions(+), 23 deletions(-) diff --git a/lib/foreman/buffer.rb b/lib/foreman/buffer.rb index b34f498d..00f626cb 100644 --- a/lib/foreman/buffer.rb +++ b/lib/foreman/buffer.rb @@ -1,4 +1,4 @@ -ANSI_TOKEN = /\e\[(?:\??\d{1,4}(?:;\d{0,4})*)?[A-Za-z]/ +ANSI_TOKEN = /\e\[(?:\??\d{1,4}(?:;\d{0,4})*)?[A-Za-z]|\e=|\e>/ NEWLINE_TOKEN = /\n/ TOKENIZER = Regexp.new("(#{ANSI_TOKEN}|#{NEWLINE_TOKEN})") @@ -6,11 +6,14 @@ class Buffer @buffer = '' def initialize(initial = '') - @buffer = initial + @buffer = initial.dup + @fd = File.open("/tmp/buffer.#{initial}.log", "w+") + @fd.sync = true end def each_token remainder = '' + @fd.puts @buffer.split(TOKENIZER).inspect @buffer.split(TOKENIZER).each do |token| if token.include?("\e") && !token.match(ANSI_TOKEN) remainder << token @@ -30,5 +33,6 @@ def gets def write(data) @buffer << data + @fd.puts "write: #{data.inspect}" end end diff --git a/lib/foreman/engine.rb b/lib/foreman/engine.rb index 0019e354..03771ce7 100644 --- a/lib/foreman/engine.rb +++ b/lib/foreman/engine.rb @@ -43,7 +43,7 @@ def initialize(options={}) @shutdown = false # Self-pipe for deferred signal-handling (ala djb: http://cr.yp.to/docs/selfpipe.html) - reader, writer = create_pipe + reader, writer = self.class.create_pipe reader.close_on_exec = true if reader.respond_to?(:close_on_exec) writer.close_on_exec = true if writer.respond_to?(:close_on_exec) @selfpipe = { :reader => reader, :writer => writer } @@ -312,7 +312,7 @@ def shutdown ## Helpers ########################################################## - def create_pipe + def self.create_pipe IO.method(:pipe).arity.zero? ? IO.pipe : IO.pipe("BINARY") end @@ -364,24 +364,21 @@ def termination_message_for(status) def spawn_processes @processes.each do |process| 1.upto(formation[@names[process]]) do |n| - reader, writer = process.interactive? ? PTY.open : create_pipe begin pid = process.run( - input: process.interactive? ? $stdin : :close, - output: writer, env: { 'PORT' => port_for(process, n).to_s, 'PS' => name_for_index(process, n) } ) - writer.puts "started with pid #{pid}" + # writer.puts "started with pid #{pid}" rescue Errno::ENOENT - writer.puts "unknown command: #{process.command}" + # writer.puts "unknown command: #{process.command}" end - @buffers[reader] = Buffer.new - @prefixed[reader] = false + @buffers[process.reader] = Buffer.new(@names[process]) + @prefixed[process.reader] = false @running[pid] = [process, n] - @readers[pid] = reader + @readers[pid] = process.reader end end end @@ -419,7 +416,7 @@ def handle_io_interactive(reader) indent = prefix(name).gsub(ANSI_TOKEN, "").length loop do - @buffers[reader].write(reader.read_nonblock(4096)) + @buffers[reader].write(reader.read_nonblock(10)) @buffers[reader].each_token do |token| case token @@ -427,8 +424,10 @@ def handle_io_interactive(reader) output_partial "\e[#{Regexp.last_match(1).to_i + indent}G" when ANSI_TOKEN output_partial token + when "\r" + output_partial "\e[#{indent+1}G" when "\n" - output_partial token + output_partial "\r\n" @prefixed[reader] = false else unless @prefixed[reader] @@ -445,8 +444,6 @@ def handle_io_interactive(reader) return if done rescue EOFError end - ensure - output_partial "\n" end def handle_io_noninteractive(reader) diff --git a/lib/foreman/engine/cli.rb b/lib/foreman/engine/cli.rb index 8a9476ce..353fad8a 100644 --- a/lib/foreman/engine/cli.rb +++ b/lib/foreman/engine/cli.rb @@ -1,4 +1,5 @@ require "foreman/engine" +require "io/console" class Foreman::Engine::CLI < Foreman::Engine @@ -56,7 +57,7 @@ def startup def output(name, data) data.to_s.lines.map(&:chomp).each do |message| $stdout.write prefix(name) - $stdout.puts message + $stdout.puts message + "\r" $stdout.flush end rescue Errno::EPIPE @@ -78,6 +79,7 @@ def prefix(name) end def shutdown + $stdin.cooked! end private diff --git a/lib/foreman/process.rb b/lib/foreman/process.rb index c00849f8..ea8db74e 100644 --- a/lib/foreman/process.rb +++ b/lib/foreman/process.rb @@ -1,10 +1,18 @@ require "foreman" +require "io/console" require "shellwords" class Foreman::Process + @noninteractive_stdin = $stdin + + class << self + attr_accessor :noninteractive_stdin + end + attr_reader :command attr_reader :env + attr_reader :reader # Create a Process # @@ -19,6 +27,8 @@ def initialize(command, options={}) @options = options.dup @options[:env] ||= {} + + self.class.noninteractive_stdin = :close if options[:interactive] end # Get environment-expanded command for a +Process+ @@ -40,19 +50,31 @@ def expanded_command(custom_env={}) # # @param [Hash] options # - # @option options :env ({}) Environment variables to set for this execution - # @option options :output ($stdout) The output stream + # @option options :env ({}) Environment variables to set for this execution # # @returns [Fixnum] pid The +pid+ of the process # def run(options={}) env = @options[:env].merge(options[:env] || {}) - input = options[:input] || $stdin - output = options[:output] || $stdout runner = "#{Foreman.runner}".shellescape - Dir.chdir(cwd) do - Process.spawn env, expanded_command(env), :in => input, :out => output, :err => output + if interactive? + $stdin.raw! + @reader, tty = PTY.open + Thread.new do + loop do + data = $stdin.readpartial(4096) + if data.include?("\03") + Process.kill("INT", Process.pid) + data.gsub!("\03", "") + end + @reader.write(data) + end + end + Process.spawn env, expanded_command(env), chdir: cwd, in: tty, out: tty, err: tty + else + @reader, writer = Foreman::Engine::create_pipe + Process.spawn env, expanded_command(env), chdir: cwd, in: self.class.noninteractive_stdin, out: writer, err: writer end end