diff --git a/Gemfile b/Gemfile index dd099929..3dc497a7 100644 --- a/Gemfile +++ b/Gemfile @@ -1,15 +1,16 @@ source 'https://rubygems.org' # Log Courier gem dependencies -gem 'ffi-rzmq' -gem 'multi_json' +gem 'cabin', '~> 0.6' +gem 'ffi-rzmq', '~> 2.0' +gem 'multi_json', '~> 1.10' # Log Courier gem JSON parsers -gem 'oj', :platforms => :mri -gem 'jrjackson', :platforms => :jruby +gem 'oj', '~> 2.11', :platforms => :mri +gem 'jrjackson', '~> 0.2', :platforms => :jruby # Profiler for MRI -gem 'ruby-prof', :platforms => :mri +gem 'ruby-prof', '~> 0.15', :platforms => :mri # Tests -gem 'rspec' +gem 'rspec', '~> 3.1' diff --git a/Makefile b/Makefile index 3e09db90..56c2483f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: prepare fix_version all log-courier gem gem_plugins push_gems test doc profile benchmark jrprofile jrbenchmark clean +.PHONY: prepare fix_version all log-courier gem gem_plugins push_gems test test_go test_rspec doc profile benchmark jrprofile jrbenchmark clean MAKEFILE := $(word $(words $(MAKEFILE_LIST)),$(MAKEFILE_LIST)) GOPATH := $(patsubst %/,%,$(dir $(abspath $(MAKEFILE)))) @@ -43,14 +43,18 @@ gem_plugins: | fix_version gem build logstash-input-log-courier.gemspec gem build logstash-output-log-courier.gemspec -push_gems: | gem gem_plugins fix_version vendor/bundle/.GemfileModT +push_gems: | gem gem_plugins build/push_gems -test: | all vendor/bundle/.GemfileModT +test_go: | all go get -d -tags "$(TAGS)" $(GOTESTS) go test -tags "$(TAGS)" $(GOTESTS) + +test_rspec: | all vendor/bundle/.GemfileModT bundle exec rspec $(TESTS) +test: | test_go test_rspec + selfsigned: | bin/lc-tlscert bin/lc-tlscert diff --git a/build/fix_version b/build/fix_version index 9cc6cb05..006ec5f5 100755 --- a/build/fix_version +++ b/build/fix_version @@ -11,6 +11,7 @@ else # Describe version from Git, and ensure the only "-xxx" is the git revision # This ensures that gem builds only add one ".pre" tag automatically VERSION="$(git describe | sed 's/-\([0-9][0-9]*\)-\([0-9a-z][0-9a-z]*\)$/.\1.\2/g')" + VERSION="${VERSION#v}" fi # Patch version.go @@ -19,13 +20,13 @@ sed "s/\\(const *Log_Courier_Version *string *= *\"\\)[^\"]*\\(\"\\)/\\1${VE # Patch the gemspecs for GEM in log-courier logstash-input-log-courier logstash-output-log-courier; do - sed "s/\\(gem.version *= *'\\)[^']*\\('\\)/\\1${VERSION#v}\\2/g" ${GEM}.gemspec > ${GEM}.gemspec.tmp + sed "s/\\(gem.version *= *'\\)[^']*\\('\\)/\\1${VERSION}\\2/g" ${GEM}.gemspec > ${GEM}.gemspec.tmp \mv -f ${GEM}.gemspec.tmp ${GEM}.gemspec [ ${GEM#logstash-} != $GEM ] && { - sed "s/\\(gem.add_runtime_dependency *'log-courier' *, *'= *\\)[^']*\\('\\)/\\1${VERSION#v}\\2/g" ${GEM}.gemspec > ${GEM}.gemspec.tmp + sed "s/\\(gem.add_runtime_dependency *'log-courier' *, *'= *\\)[^']*\\('\\)/\\1${VERSION}\\2/g" ${GEM}.gemspec > ${GEM}.gemspec.tmp \mv -f ${GEM}.gemspec.tmp ${GEM}.gemspec } done -echo "${VERSION#v}" > version.txt +echo "${VERSION}" > version.txt echo "Set Log Courier Version ${VERSION}" diff --git a/contrib/initscripts/log-courier.sysconfig b/contrib/initscripts/log-courier.sysconfig new file mode 100644 index 00000000..07dbd982 --- /dev/null +++ b/contrib/initscripts/log-courier.sysconfig @@ -0,0 +1,3 @@ +# log-courier config file location +CONFIG_FILE="/etc/log-courier/log-courier.conf" + diff --git a/contrib/log-courier.init b/contrib/initscripts/redhat-sysv.init old mode 100644 new mode 100755 similarity index 71% rename from contrib/log-courier.init rename to contrib/initscripts/redhat-sysv.init index 13a6ff1c..d9223132 --- a/contrib/log-courier.init +++ b/contrib/initscripts/redhat-sysv.init @@ -1,6 +1,6 @@ #!/bin/sh # -# log-courier Log Courier +# log-courier Log Courier, a lightweight log shipper with Logstash integration. # # chkconfig: 2345 90 10 # description: Controls the Log Courier daemon @@ -11,7 +11,7 @@ # Required-Stop: $local_fs $remote_fs $syslog # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 -# Short-Description: Log Courier +# Short-Description: Log Courier, a lightweight log shipper with Logstash integration. ### END INIT INFO # source function library @@ -19,9 +19,18 @@ DAEMON='/usr/sbin/log-courier' DATA_DIR='/var/lib/log-courier' -CONFIG_FILE='/etc/log-courier/log-courier.conf' PID_FILE='/var/run/log-courier.pid' +# check if file exists +if [ -f /etc/sysconfig/log-courier ]; then + . /etc/sysconfig/log-courier +fi + +# set default config file, if $CONFIG_FILE is not set +if [ -z ${CONFIG_FILE} ]; then + CONFIG_FILE='/etc/log-courier/log-courier.conf' +fi + do_start() { echo -n "Starting Log Courier: " status -p $PID_FILE $DAEMON &>/dev/null @@ -74,6 +83,14 @@ case "$1" in RC=$? fi ;; + condrestart|try-restart) + status -p $PID_FILE $DAEMON + RC=$? + if [ $RC -eq 0 ]; then + $0 restart + RC=$? + fi + ;; configtest) echo -n "Configuration test: " TESTRESULT=$( ${DAEMON} -config="${CONFIG_FILE}" -config-test ) @@ -88,7 +105,7 @@ case "$1" in fi ;; *) - echo "Usage: $0 start|stop|status|reload|restart|configtest" + echo "Usage: $0 start|stop|status|reload|restart|condrestart|try-restart|configtest" exit 1 ;; esac diff --git a/contrib/initscripts/systemd.service b/contrib/initscripts/systemd.service new file mode 100644 index 00000000..ebbe2af8 --- /dev/null +++ b/contrib/initscripts/systemd.service @@ -0,0 +1,15 @@ +[Unit] +Description=Log Courier, a lightweight log shipper with Logstash integration. +After=syslog.target network.target remote-fs.target nss-lookup.target + +[Service] +Type=simple +EnvironmentFile=/etc/sysconfig/log-courier +ExecStartPre=/usr/sbin/log-courier -config-test=true -config=${CONFIG_FILE} +ExecStart=/usr/sbin/log-courier -config=${CONFIG_FILE} +ExecReload=/bin/kill -s HUP $MAINPID +ExecStop=/bin/kill -s TERM $MAINPID +PrivateTmp=true + +[Install] +WantedBy=multi-user.target diff --git a/contrib/rpm/log-courier.spec b/contrib/rpm/log-courier.spec new file mode 100644 index 00000000..0ead8b28 --- /dev/null +++ b/contrib/rpm/log-courier.spec @@ -0,0 +1,162 @@ +# Debug packaging does not work due to the go build building in extra debug sections RPM does not understand +# Maybe we patch something later to fix this, but for now just don't build a debug package +%define debug_package %{nil} + +Summary: Log Courier +Name: log-courier +Version: 1.2 +Release: 4%{dist} +License: GPL +Group: System Environment/Libraries +Packager: Jason Woods +URL: https://github.com/driskell/log-courier +Source: https://github.com/driskell/log-courier/archive/v%{version}.zip +BuildRoot: %{_tmppath}/%{name}-%{version}-root + +BuildRequires: golang >= 1.2 +BuildRequires: git +BuildRequires: zeromq3-devel + +# Maybe tests in future - mock won't build ffi gem +#BuildRequires: ruby >= 1.9.3, ruby-devel >= 1.9.3 +#BuildRequires: rubygem-bundler + +%if 0%{?rhel} >= 7 +Requires(post): systemd +Requires(preun): systemd +Requires(postun): systemd +BuildRequires: systemd +%endif + +Requires: zeromq3 +Requires: logrotate + +%description +Log Courier is a tool created to transmit log files speedily and securely to +remote Logstash instances for processing whilst using small amounts of local +resources. The project is an enhanced fork of Logstash Forwarder 0.3.1 with many +enhancements and behavioural improvements. + +%prep +%setup -q -n %{name}-%{version} + +%build +make with=zmq3 +# See notes above for BuildRequires ruby +#make with=zmq3 test + +%install +# Install binaries +mkdir -p %{buildroot}%{_sbindir} +install -m 0755 bin/log-courier %{buildroot}%{_sbindir}/log-courier +mkdir -p %{buildroot}%{_bindir} +install -m 0755 bin/lc-admin %{buildroot}%{_bindir}/lc-admin +install -m 0755 bin/lc-tlscert %{buildroot}%{_bindir}/lc-tlscert + +# Install example configuration +mkdir -p %{buildroot}%{_sysconfdir}/log-courier %{buildroot}%{_sysconfdir}/log-courier/examples/ +install -m 0644 docs/examples/* %{buildroot}%{_sysconfdir}/log-courier/examples/ + +# Make the run dir +mkdir -p %{buildroot}%{_var}/run %{buildroot}%{_var}/run/log-courier +touch %{buildroot}%{_var}/run/log-courier/admin.socket + +# Install init script and related paraphernalia +%if 0%{?rhel} >= 7 +mkdir -p %{buildroot}%{_unitdir} +# No systemd script in log-courier release yet +install -m 0644 contrib/initscripts/systemd.service %{buildroot}%{_unitdir}/log-courier.service +%else +mkdir -p %{buildroot}%{_sysconfdir}/init.d +install -m 0755 contrib/initscripts/redhat-sysv.init %{buildroot}%{_sysconfdir}/init.d/log-courier +touch %{buildroot}%{_var}/run/log-courier.pid +%endif + +# Make the state dir +mkdir -p %{buildroot}%{_var}/lib/log-courier +touch %{buildroot}%{_var}/lib/log-courier/.log-courier + +%clean +rm -rf $RPM_BUILD_ROOT + +%post +%if 0%{?rhel} >= 7 +%systemd_post log-courier.service +%else +/sbin/chkconfig --add log-courier +%endif + +%preun +%if 0%{?rhel} >= 7 +%systemd_preun log-courier.service +%else +if [ $1 -eq 0 ]; then + /sbin/service log-courier stop >/dev/null 2>&1 + /sbin/chkconfig --del log-courier +fi +%endif + +%postun +%if 0%{?rhel} >= 7 +%systemd_postun_with_restart log-courier.service +%else +if [ $1 -ge 1 ]; then + /sbin/service log-courier restart >/dev/null 2>&1 +fi +%endif + +%files +%defattr(0755,root,root,0755) +%{_sbindir}/log-courier +%{_bindir}/lc-admin +%{_bindir}/lc-tlscert +%if 0%{?rhel} >= 7 +%{_unitdir}/log-courier.service +%else +%{_sysconfdir}/init.d/log-courier +%endif + +%defattr(0644,root,root,0755) +%{_sysconfdir}/log-courier +%if 0%{?rhel} < 7 +%ghost %{_var}/run/log-courier.pid +%endif +%dir %attr(0700,root,root) %{_var}/run/log-courier +%ghost %{_var}/run/log-courier/admin.socket +%dir %{_var}/lib/log-courier +%ghost %{_var}/lib/log-courier/.log-courier + +%changelog +* Sat Nov 8 2014 Jason Woods - 1.2-4 +- Upgrade to v1.2 +- Fix stop message on future upgrade + +* Wed Nov 5 2014 Jason Woods - 1.1-4 +- Build with ZMQ 3 support + +* Mon Nov 3 2014 Jason Woods - 1.1-3 +- Fix init/systemd registration + +* Sun Nov 2 2014 Jason Woods - 1.1-2 +- Package for EL7 +- Restart service on upgrade + +* Fri Oct 31 2014 Jason Woods 1.1-1 +- Released 1.1 +- Cleanup for EL7 build + +* Mon Oct 13 2014 Jason Woods 0.15.1-1 +- Rebuild from v0.15 develop to fix more issues +- Label as v0.15.1 + +* Thu Sep 4 2014 Jason Woods 0.14.rc2-1 +- Rebuild from develop to fix more issues and enable unix socket + for administration +- Label as v0.14.rc2 + +* Wed Sep 3 2014 Jason Woods 0.14.rc1-1 +- Rebuild from develop to fix various reconnect hang issues +- Label as v0.14.rc1 + +* Mon Sep 1 2014 Jason Woods 0.13-1 +- Initial build of v0.13 diff --git a/docs/AdministrationUtility.md b/docs/AdministrationUtility.md index dd546353..95f606e2 100644 --- a/docs/AdministrationUtility.md +++ b/docs/AdministrationUtility.md @@ -24,6 +24,11 @@ listen address, set the `admin listen address` entry. See [Configuration](Configuration.md) for more information on these options and the default listen address. +The `lc-admin` utility aims to be always backwards compatible whenever possible. +This means a newer version of `lc-admin` should be able to connect to any older +version of `log-courier`. The same is not true in reverse, and an older +`lc-admin` may be unable to connect or communicate with a newer `log-courier`. + ## Available Commands ### `help` diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index 9c8b737e..cc1fae36 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -4,6 +4,7 @@ **Table of Contents** *generated with [DocToc](http://doctoc.herokuapp.com/)* +- [1.2](#12) - [1.1](#11) - [1.0](#10) - [0.15](#015) @@ -17,6 +18,27 @@ +## 1.2 + +*1st December 2014* + +* Fix repeated partial Acks triggering an incorrect flush of events to registrar +* Fix a loop that could occur when using ZMQ transport (#68) +* TLS and TCP transport will now round robin the available server addresses +instead of randomising +* Implemented "Dead time in" on `lc-admin` harvester statuses +* `lc-admin` status output is now sorted and no longer in random order +* Add a workaround for logstash shutdown looping with "Context is terminated" +messages (#73) +* Implement asynchronous ZMQ receive pipeline in the Logstash gem to resolve +timeout issues with multiple clients and a busy pipeline +* Implement multithreaded SSL accept in the Logstash gem to prevent a single +hung handshake attempt from blocking new connections +* Switch to ruby-cabin logging in the gems to match Logstash logging +* Updated the RedHat/CentOS 5/6 SysV init script in contrib to follow Fedora +packaging guidelines +* Provided a RedHat/CentOS 7 systemd service configuration in contrib + ## 1.1 *30th October 2014* diff --git a/docs/Configuration.md b/docs/Configuration.md index 8657cc2a..6a1288aa 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -14,33 +14,34 @@ - [`"general"`](#general) - [`"admin enabled"`](#admin-enabled) - [`"admin listen address"`](#admin-listen-address) + - [`"log file"`](#log-file) + - [`"log level"`](#log-level) + - [`"log stdout"`](#log-stdout) + - [`"log syslog"`](#log-syslog) + - [`"max line bytes"`](#max-line-bytes) - [`"persist directory"`](#persist-directory) - [`"prospect interval"`](#prospect-interval) - - [`"spool size"`](#spool-size) - [`"spool max bytes"`](#spool-max-bytes) + - [`"spool size"`](#spool-size) - [`"spool timeout"`](#spool-timeout) - - [`"max line bytes"`](#max-line-bytes) - - [`"log level"`](#log-level) - - [`"log stdout"`](#log-stdout) - - [`"log syslog"`](#log-syslog) - - [`"log file"`](#log-file) - [`"network"`](#network) - - [`"transport"`](#transport) + - [`"curve server key"`](#curve-server-key) + - [`"curve public key"`](#curve-public-key) + - [`"curve secret key"`](#curve-secret-key) + - [`"max pending payloads"`](#max-pending-payloads) + - [`"peer send queue"`](#peer-send-queue) + - [`"reconnect"`](#reconnect) - [`"servers"`](#servers) - [`"ssl ca"`](#ssl-ca) - [`"ssl certificate"`](#ssl-certificate) - [`"ssl key"`](#ssl-key) - - [`"curve server key"`](#curve-server-key) - - [`"curve public key"`](#curve-public-key) - - [`"curve secret key"`](#curve-secret-key) - [`"timeout"`](#timeout) - - [`"reconnect"`](#reconnect) - - [`"max pending payloads"`](#max-pending-payloads) + - [`"transport"`](#transport) - [`"files"`](#files) - - [`"paths"`](#paths) - - [`"fields"`](#fields) - - [`"dead time"`](#dead-time) - [`"codec"`](#codec) + - [`"dead time"`](#dead-time) + - [`"fields"`](#fields) + - [`"paths"`](#paths) - [`"includes"`](#includes) @@ -177,6 +178,51 @@ Examples: tcp:127.0.0.1:1234 unix:/var/run/log-courier/admin.socket +### `"log file"` + +*Filepath. Optional* +*Requires restart* + +A log file to save Log Courier's internal log into. May be used in conjunction with `"log stdout"` and `"log syslog"`. + +### `"log level"` + +*String. Optional. Default: "info". +Available values: "critical", "error", "warning", "notice", "info", "debug"* +*Requires restart* + +The maximum level of detail to produce in Log Courier's internal log. + +### `"log stdout"` + +*Boolean. Optional. Default: true* +*Requires restart* + +Enables sending of Log Courier's internal log to the console (stdout). May be used in conjunction with `"log syslog"` and `"log file"`. + +### `"log syslog"` + +*Boolean. Optional. Default: false* +*Requires restart* + +Enables sending of Log Courier's internal log to syslog. May be used in conjunction with `"log stdout"` and `"log file"`. + +*This option is ignored by Windows builds.* + +### `"max line bytes"` + +*Number. Optional. Default: 1048576* + +The maxmimum line length to process. If a line exceeds this length, it will be +split across multiple events. Each split line will have a "tag" field added +containing the tag "splitline". The final part of the line will not have a "tag" +field added. + +If the `fields` configuration already contained a "tags" entry, and it is an +array, it will be appended to. Otherwise, the "tag" field will be left as is. + +This setting can not be greater than the `spool max bytes` setting. + ### `"persist directory"` *String. Optional. Default: "."* @@ -198,6 +244,18 @@ server acknowledges receipt of the events. How often Log Courier should check for changes on the filesystem, such as the appearance of new log files, rotations and deletions. +### `"spool max bytes"` + +*Number. Optional. Default: 10485760* + +The maximum size of an event spool, before compression. If an incomplete spool +does not have enough room for the next event, it will be flushed immediately. + +If this value is modified, the receiving end should also be configured with the +new limit. For the Logstash plugin, this is the `max_packet_size` setting. + +The maximum value for this setting is 2147483648 (2 GiB). + ### `"spool size"` *Number. Optional. Default: 1024* @@ -214,18 +272,6 @@ usage. easily cope with over 10,000 events a second and uses little memory. It is useful only in very specific circumstances.* -### `"spool max bytes"` - -*Number. Optional. Default: 10485760* - -The maximum size of an event spool, before compression. If an incomplete spool -does not have enough room for the next event, it will be flushed immediately. - -If this value is modified, the receiving end should also be configured with the -new limit. For the Logstash plugin, this is the `max_packet_size` setting. - -The maximum value for this setting is 2147483648 (2 GiB). - ### `"spool timeout"` *Duration. Optional. Default: 5* @@ -233,80 +279,57 @@ The maximum value for this setting is 2147483648 (2 GiB). The maximum amount of time to wait for a full spool. If an incomplete spool is not filled within this time limit, the spool will be flushed immediately. -### `"max line bytes"` - -*Number. Optional. Default: 1048576* - -The maxmimum line length to process. If a line exceeds this length, it will be -split across multiple events. Each split line will have a "tag" field added -containing the tag "splitline". The final part of the line will not have a "tag" -field added. - -If the `fields` configuration already contained a "tags" entry, and it is an -array, it will be appended to. Otherwise, the "tag" field will be left as is. - -This setting can not be greater than the `spool max bytes` setting. - -### `"log level"` - -*String. Optional. Default: "info". -Available values: "critical", "error", "warning", "notice", "info", "debug"* -*Requires restart* - -The maximum level of detail to produce in Log Courier's internal log. +## `"network"` -### `"log stdout"` +The network configuration tells Log Courier where to ship the logs, and also +what transport and security to use. -*Boolean. Optional. Default: true* -*Requires restart* +### `"curve server key"` -Enables sending of Log Courier's internal log to the console (stdout). May be used in conjunction with `"log syslog"` and `"log file"`. +*String. Required with "transport" = "zmq". Not allowed otherwise* -### `"log syslog"` +The Z85-encoded public key that corresponds to the server(s) secret key. Used +to verify the server(s) identity. This can be generated using the Genkey tool. -*Boolean. Optional. Default: false* -*Requires restart* +### `"curve public key"` -Enables sending of Log Courier's internal log to syslog. May be used in conjunction with `"log stdout"` and `"log file"`. +*String. Required with "transport" = "zmq". Not allowed otherwise* -*This option is ignored by Windows builds.* +The Z85-encoded public key for this client. This can be generated using the +Genkey tool. -### `"log file"` +### `"curve secret key"` -*Filepath. Optional* -*Requires restart* +*String. Required with "transport" = "zmq". Not allowed otherwise* -A log file to save Log Courier's internal log into. May be used in conjunction with `"log stdout"` and `"log syslog"`. +The Z85-encoded secret key for this client. This can be generated using the +Genkey tool. -## `"network"` +### `"max pending payloads"` -The network configuration tells Log Courier where to ship the logs, and also -what transport and security to use. +*Number. Optional. Default: 10* -### `"transport"` +The maximum number of spools that can be in transit at any one time. Each spool +will be kept in memory until the remote server acknowledges it. -*String. Optional. Default: "tls" -Available values: "tcp", "tls", "plainzmq", "zmq"* +If Log Courier has sent this many spools to the remote server, and has not yet +received acknowledgement responses for them (either because the remote server +is busy or because the link has high latency), it will pause and wait before +sending anymore data. -*Depending on how log-courier was built, some transports may not be available. -Run `log-courier -list-supported` to see the list of transports available in -a specific build of log-courier.* +*For most installations you should leave this at the default as it is high +enough to maintain throughput even on high latency links and low enough not to +cause excessive memory usage.* -Sets the transport to use when sending logs to the servers. "tls" is recommended -for most users and connects to a single server at random, reconnecting to a -different server at random each time the connection fails. "curvezmq" connects -to all specified servers and load balances events across them. +### `"reconnect"` -"tcp" and "plainzmq" are **insecure** equivalents to "tls" and "zmq" -respectively that do not encrypt traffic or authenticate the identity of -servers. These should only be used on trusted internal networks. If in doubt, -use the secure authenticating transports "tls" and "zmq". +*Duration. Optional. Default: 1* -"plainzmq" is only available if Log Courier was compiled with the "with=zmq3" or -"with=zmq4" options. +Pause this long before reconnecting. If the remote server is completely down, +this slows down the rate of reconnection attempts. -"zmq" is only available if Log Courier was compiled with the "with=zmq4" -option. +When using the ZMQ transport, this is how long to wait before restarting the ZMQ +stack when it was reset. ### `"servers"` @@ -335,27 +358,6 @@ Path to a PEM encoded certificate file to use as the client certificate. Path to a PEM encoded private key to use with the client certificate. -### `"curve server key"` - -*String. Required with "transport" = "zmq". Not allowed otherwise* - -The Z85-encoded public key that corresponds to the server(s) secret key. Used -to verify the server(s) identity. This can be generated using the Genkey tool. - -### `"curve public key"` - -*String. Required with "transport" = "zmq". Not allowed otherwise* - -The Z85-encoded public key for this client. This can be generated using the -Genkey tool. - -### `"curve secret key"` - -*String. Required with "transport" = "zmq". Not allowed otherwise* - -The Z85-encoded secret key for this client. This can be generated using the -Genkey tool. - ### `"timeout"` *Duration. Optional. Default: 15* @@ -369,31 +371,30 @@ will wait for a response to a request. If any response is not received within this time period the corresponding request is retransmitted. If no responses are received within this time period, the entire ZMQ stack is reset. -### `"reconnect"` - -*Duration. Optional. Default: 1* - -Pause this long before reconnecting. If the remote server is completely down, -this slows down the rate of reconnection attempts. +### `"transport"` -When using the ZMQ transport, this is how long to wait before restarting the ZMQ -stack when it was reset. +*String. Optional. Default: "tls" +Available values: "tcp", "tls", "plainzmq", "zmq"* -### `"max pending payloads"` +*Depending on how log-courier was built, some transports may not be available. +Run `log-courier -list-supported` to see the list of transports available in +a specific build of log-courier.* -*Number. Optional. Default: 10* +Sets the transport to use when sending logs to the servers. "tls" is recommended +for most users and connects to a single server at random, reconnecting to a +different server at random each time the connection fails. "curvezmq" connects +to all specified servers and load balances events across them. -The maximum number of spools that can be in transit at any one time. Each spool -will be kept in memory until the remote server acknowledges it. +"tcp" and "plainzmq" are **insecure** equivalents to "tls" and "zmq" +respectively that do not encrypt traffic or authenticate the identity of +servers. These should only be used on trusted internal networks. If in doubt, +use the secure authenticating transports "tls" and "zmq". -If Log Courier has sent this many spools to the remote server, and has not yet -received acknowledgement responses for them (either because the remote server -is busy or because the link has high latency), it will pause and wait before -sending anymore data. +"plainzmq" is only available if Log Courier was compiled with the "with=zmq3" or +"with=zmq4" options. -*For most installations you should leave this at the default as it is high -enough to maintain throughput even on high latency links and low enough not to -cause excessive memory usage.* +"zmq" is only available if Log Courier was compiled with the "with=zmq4" +option. ## `"files"` @@ -412,34 +413,29 @@ configuration must be specified. ] ``` -### `"paths"` - -*Array of Fileglobs. Required* - -At least one Fileglob must be specified and all matching files for all provided -globs will be tailed. - -See above for a description of the Fileglob field type. +### `"codec"` -Examples: +*Codec configuration. Optional. Default: `{ "name": "plain" }`* +*Configuration reload will only affect new or resumed files* -* `[ "/var/log/*.log" ]` -* `[ "/var/log/program/log_????.log" ]` -* `[ "/var/log/httpd/access.log", "/var/log/httpd/access.log.[0-9]" ]` +*Depending on how log-courier was built, some codecs may not be available. Run +`log-courier -list-supported` to see the list of codecs available in a specific +build of log-courier.* -### `"fields"` +The specified codec will receive the lines read from the log stream and perform +any decoding necessary to generate events. The plain codec does nothing and +simply ships the events unchanged. -*Dictionary. Optional* -*Configuration reload will only affect new or resumed files* +All configurations are a dictionary with at least a "name" key. Additional +options can be provided if the specified codec allows. -Extra fields to attach the event prior to shipping. These can be simple strings, -numbers or even arrays and dictionaries. + { "name": "codec-name" } + { "name": "codec-name", "option1": "value", "option2": "42" } -Examples: +Aside from "plain", the following codecs are available at this time. -* `{ "type": "syslog" }` -* `{ "type": "apache", "server_names": [ "example.com", "www.example.com" ] }` -* `{ "type": "program", "program": { "exec": "program.py", "args": [ "--run", "--daemon" ] } }` +* [Filter](codecs/Filter.md) +* [Multiline](codecs/Multiline.md) ### `"dead time"` @@ -454,29 +450,34 @@ If a log file that is being harvested is deleted, it will remain on disk until Log Courier closes it. Therefore it is important to keep this value sensible to ensure old log files are not kept open preventing deletion. -### `"codec"` +### `"fields"` -*Codec configuration. Optional. Default: `{ "name": "plain" }`* +*Dictionary. Optional* *Configuration reload will only affect new or resumed files* -*Depending on how log-courier was built, some codecs may not be available. Run -`log-courier -list-supported` to see the list of codecs available in a specific -build of log-courier.* +Extra fields to attach the event prior to shipping. These can be simple strings, +numbers or even arrays and dictionaries. -The specified codec will receive the lines read from the log stream and perform -any decoding necessary to generate events. The plain codec does nothing and -simply ships the events unchanged. +Examples: -All configurations are a dictionary with at least a "name" key. Additional -options can be provided if the specified codec allows. +* `{ "type": "syslog" }` +* `{ "type": "apache", "server_names": [ "example.com", "www.example.com" ] }` +* `{ "type": "program", "program": { "exec": "program.py", "args": [ "--run", "--daemon" ] } }` - { "name": "codec-name" } - { "name": "codec-name", "option1": "value", "option2": "42" } +### `"paths"` -Aside from "plain", the following codecs are available at this time. +*Array of Fileglobs. Required* -* [Filter](codecs/Filter.md) -* [Multiline](codecs/Multiline.md) +At least one Fileglob must be specified and all matching files for all provided +globs will be tailed. + +See above for a description of the Fileglob field type. + +Examples: + +* `[ "/var/log/*.log" ]` +* `[ "/var/log/program/log_????.log" ]` +* `[ "/var/log/httpd/access.log", "/var/log/httpd/access.log.[0-9]" ]` ## `"includes"` diff --git a/docs/codecs/Filter.md b/docs/codecs/Filter.md index c46fb9c8..111dac8b 100644 --- a/docs/codecs/Filter.md +++ b/docs/codecs/Filter.md @@ -8,8 +8,8 @@ The filter codec strips out unwanted events, shipping only those desired. - [Example](#example) - [Options](#options) - - [`"patterns"`](#patterns) - [`"negate"`](#negate) + - [`"patterns"`](#patterns) @@ -22,6 +22,13 @@ The filter codec strips out unwanted events, shipping only those desired. ## Options +### `"negate"` + +*Boolean. Optional. Default: false* + +Negates `patterns` so that an event is only shipped if none of the patterns +matched. + ### `"patterns"` *Array of Strings. Required* @@ -34,10 +41,3 @@ until the next event. As such, patterns with higher hit rates should be specified first. The pattern syntax is detailed at https://code.google.com/p/re2/wiki/Syntax. - -### `"negate"` - -*Boolean. Optional. Default: false* - -Negates `patterns` so that an event is only shipped if none of the patterns -matched. diff --git a/docs/codecs/Multiline.md b/docs/codecs/Multiline.md index 8306bd56..93504deb 100644 --- a/docs/codecs/Multiline.md +++ b/docs/codecs/Multiline.md @@ -12,11 +12,11 @@ option. - [Example](#example) - [Options](#options) - - [`"pattern"`](#pattern) + - [`"max multiline bytes"`](#max-multiline-bytes) - [`"negate"`](#negate) - - [`"what"`](#what) + - [`"pattern"`](#pattern) - [`"previous timeout"`](#previous-timeout) - - [`"max multiline bytes"`](#max-multiline-bytes) + - [`"what"`](#what) @@ -32,13 +32,14 @@ option. ## Options -### `"pattern"` +### `"max multiline bytes"` -*String. Required* +*Number. Optional. Default: `spool max bytes`* -A regular expression to match against each line. +The maximum multiline length to process. If a multiline block exeeds this +length, it will be split across multiple events. -The syntax is detailed at https://code.google.com/p/re2/wiki/Syntax. +This setting can not be greater than the `spool max bytes` setting. ### `"negate"` @@ -47,6 +48,22 @@ The syntax is detailed at https://code.google.com/p/re2/wiki/Syntax. Negates `pattern` so that a match becomes a non-match and a non-match becomes a match. +### `"pattern"` + +*String. Required* + +A regular expression to match against each line. + +The syntax is detailed at https://code.google.com/p/re2/wiki/Syntax. + +### `"previous timeout"` + +*Duration. Optional. Default: 0. Ignored when "what" != "previous"* + +When using `"previous"`, if `"previous timeout"` is not 0 any buffered lines +will be flushed as a single event if no more lines are received within the +specified time period. + ### `"what"` *String. Optional. Default: "previous" @@ -65,20 +82,3 @@ single event and start a new buffer. A side effect of using `"previous"` is that an event will not be flushed until the first line of the next event is encountered. The `"previous timeout"` option offers a solution to this. - -### `"previous timeout"` - -*Duration. Optional. Default: 0. Ignored when "what" != "previous"* - -When using `"previous"`, if `"previous timeout"` is not 0 any buffered lines -will be flushed as a single event if no more lines are received within the -specified time period. - -### `"max multiline bytes"` - -*Number. Optional. Default: `spool max bytes`* - -The maximum multiline length to process. If a multiline block exeeds this -length, it will be split across multiple events. - -This setting can not be greater than the `spool max bytes` setting. diff --git a/lib/log-courier/client.rb b/lib/log-courier/client.rb index ec546461..0a443a0c 100644 --- a/lib/log-courier/client.rb +++ b/lib/log-courier/client.rb @@ -42,7 +42,7 @@ def initialize(options = {}) @ack_events = 0 options.each do |k, v| - raise ArgumentError unless self.respond_to?(k) + fail ArgumentError unless self.respond_to?(k) instance_variable_set "@#{k}", v end end @@ -58,6 +58,7 @@ def initialize(options = {}) }.merge!(options) @logger = @options[:logger] + @logger['plugin'] = 'output/courier' require 'log-courier/client_tls' @client = ClientTls.new(@options) @@ -90,6 +91,7 @@ def initialize(options = {}) def publish(event) # Pass the event into the spooler @event_queue << event + return end def shutdown @@ -98,8 +100,11 @@ def shutdown @io_thread.raise ShutdownSignal @spooler_thread.join @io_thread.join + return end + private + def run_spooler loop do spooled = [] @@ -124,9 +129,9 @@ def run_spooler @io_control << ['E', spooled] end end + return rescue ShutdownSignal - # Just shutdown - 0 + return end def run_io @@ -207,12 +212,12 @@ def run_io # Keepalive timeout hit, send a PING unless we were awaiting a PONG if @pending_ping # Timed out, break into reconnect - raise TimeoutError + fail TimeoutError end # Is send full? can_send will be false if so # We should've started receiving ACK by now so time out - raise TimeoutError unless can_send + fail TimeoutError unless can_send # Send PING send_ping @@ -227,17 +232,16 @@ def run_io end rescue ProtocolError => e # Reconnect required due to a protocol error - @logger.warn("[LogCourierClient] Protocol error: #{e}") unless @logger.nil? + @logger.warn 'Protocol error', :error => e.message unless @logger.nil? rescue TimeoutError # Reconnect due to timeout - @logger.warn('[LogCourierClient] Timeout occurred') unless @logger.nil? + @logger.warn 'Timeout occurred' unless @logger.nil? rescue ShutdownSignal # Shutdown, break out break - rescue => e + rescue StandardError, NativeException => e # Unknown error occurred - @logger.warn("[LogCourierClient] Unknown error: #{e}") unless @logger.nil? - @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? + @logger.warn e, :hint => 'Unknown error' unless @logger.nil? end # Disconnect and retry payloads @@ -249,10 +253,12 @@ def run_io end @client.disconnect + return end def reset_keepalive @keepalive_next = Time.now.to_i + @keepalive_timeout + return end def generate_nonce @@ -262,6 +268,7 @@ def generate_nonce def send_ping # Send it @client.send 'PING', '' + return end def send_jdat(events) @@ -288,6 +295,7 @@ def send_jdat(events) # Send it @client.send 'JDAT', payload.data + return end def buffer_jdat_data(events, nonce) @@ -307,23 +315,21 @@ def buffer_jdat_data_event(buffer, event) # Add length and then the data buffer << [json_data.length].pack('N') << json_data + return end def process_pong(message) # Sanity - if message.length != 0 - raise ProtocolError, "Unexpected data attached to pong message (#{message.length})" - end + fail ProtocolError, "Unexpected data attached to pong message (#{message.length})" if message.length != 0 # No longer pending a PONG @ping_pending = false + return end def process_ackn(message) # Sanity - if message.length != 20 - raise ProtocolError, "ACKN message size invalid (#{message.length})" - end + fail ProtocolError, "ACKN message size invalid (#{message.length})" if message.length != 20 # Grab nonce sequence, nonce = message[0...4].unpack('N').first, message[4..-1] @@ -348,6 +354,7 @@ def process_ackn(message) payload.data = nil end end + return end end end diff --git a/lib/log-courier/client_tls.rb b/lib/log-courier/client_tls.rb index fdf0bd91..44c7ac24 100644 --- a/lib/log-courier/client_tls.rb +++ b/lib/log-courier/client_tls.rb @@ -38,17 +38,17 @@ def initialize(options = {}) @logger = @options[:logger] [:port, :ssl_ca].each do |k| - raise "[LogCourierClient] '#{k}' is required" if @options[k].nil? + fail "output/courier: '#{k}' is required" if @options[k].nil? end - raise '[LogCourierClient] \'addresses\' must contain at least one address' if @options[:addresses].empty? + fail 'output/courier: \'addresses\' must contain at least one address' if @options[:addresses].empty? c = 0 [:ssl_certificate, :ssl_key].each do c += 1 end - raise '[LogCourierClient] \'ssl_certificate\' and \'ssl_key\' must be specified together' if c == 1 + fail 'output/courier: \'ssl_certificate\' and \'ssl_key\' must be specified together' if c == 1 end def connect(io_control) @@ -71,6 +71,7 @@ def connect(io_control) @recv_thread = Thread.new do run_recv io_control end + return end def disconnect @@ -78,8 +79,17 @@ def disconnect @send_thread.join @recv_thread.raise ShutdownSignal @recv_thread.join + return end + def send(signature, message) + # Add to send queue + @send_q << [signature, message.length].pack('A4N') + message + return + end + + private + def run_send(io_control) # Ask for something to send io_control << ['S'] @@ -108,29 +118,31 @@ def run_send(io_control) @ssl_client.write message end end + return rescue OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET => e - @logger.warn("[LogCourierClient] SSL write error: #{e}") unless @logger.nil? + @logger.warn 'SSL write error', :error => e.message unless @logger.nil? io_control << ['F'] + return rescue ShutdownSignal - # Just shutdown - rescue => e - @logger.warn("[LogCourierClient] Unknown SSL write error: #{e}") unless @logger.nil? - @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? + return + rescue StandardError, NativeException => e + @logger.warn e, :hint => 'Unknown SSL write error' unless @logger.nil? io_control << ['F'] + return end def run_recv(io_control) loop do # Grab a header header = @ssl_client.read(8) - raise EOFError if header.nil? + fail EOFError if header.nil? # Decode signature and length signature, length = header.unpack('A4N') if length > 1048576 # Too big raise error - @logger.warn("[LogCourierClient] Invalid message: data too big (#{length})") unless @logger.nil? + @logger.warn 'Invalid message: data too big', :data_length => length unless @logger.nil? io_control << ['F'] break end @@ -141,29 +153,28 @@ def run_recv(io_control) # Pass through to receive io_control << ['R', signature, message] end + return rescue OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET => e - @logger.warn("[LogCourierClient] SSL read error: #{e}") unless @logger.nil? + @logger.warn 'SSL read error', :error => e.message unless @logger.nil? io_control << ['F'] + return rescue EOFError - @logger.warn("[LogCourierClient] Connection closed by server") unless @logger.nil? + @logger.warn 'Connection closed by server' unless @logger.nil? io_control << ['F'] + return rescue ShutdownSignal - # Just shutdown + return rescue => e - @logger.warn("[LogCourierClient] Unknown SSL read error: #{e}") unless @logger.nil? - @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? + @logger.warn e, :hint => 'Unknown SSL read error' unless @logger.nil? io_control << ['F'] - end - - def send(signature, message) - # Add to send queue - @send_q << [signature, message.length].pack('A4N') + message + return end def pause_send return if @send_paused @send_paused = true @send_q << nil + return end def send_paused @@ -175,44 +186,53 @@ def resume_send @send_paused = false @send_q << nil end + return end def tls_connect # TODO: Implement random selection - and don't use separate :port - remember to update post_connection_check too - @logger.info("[LogCourierClient] Connecting to #{@options[:addresses][0]}:#{@options[:port]}") unless @logger.nil? - tcp_socket = TCPSocket.new(@options[:addresses][0], @options[:port]) + address = @options[:addresses][0] + port = @options[:port] - ssl = OpenSSL::SSL::SSLContext.new + @logger.info 'Connecting', :address => address, :port => port unless @logger.nil? - unless @options[:ssl_certificate].nil? - ssl.cert = OpenSSL::X509::Certificate.new(File.read(@options[:ssl_certificate])) - ssl.key = OpenSSL::PKey::RSA.new(File.read(@options[:ssl_key]), @options[:ssl_key_passphrase]) - end + begin + tcp_socket = TCPSocket.new(address, port) - cert_store = OpenSSL::X509::Store.new - cert_store.add_file(@options[:ssl_ca]) - #ssl.cert_store = cert_store - ssl.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT + ssl = OpenSSL::SSL::SSLContext.new - @ssl_client = OpenSSL::SSL::SSLSocket.new(tcp_socket) + unless @options[:ssl_certificate].nil? + ssl.cert = OpenSSL::X509::Certificate.new(File.read(@options[:ssl_certificate])) + ssl.key = OpenSSL::PKey::RSA.new(File.read(@options[:ssl_key]), @options[:ssl_key_passphrase]) + end - socket = @ssl_client.connect + cert_store = OpenSSL::X509::Store.new + cert_store.add_file(@options[:ssl_ca]) + #ssl.cert_store = cert_store + ssl.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT - # Verify certificate - socket.post_connection_check(@options[:addresses][0]) + @ssl_client = OpenSSL::SSL::SSLSocket.new(tcp_socket) - @logger.info("[LogCourierClient] Connected successfully") unless @logger.nil? + socket = @ssl_client.connect - socket - rescue OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET => e - @logger.warn("[LogCourierClient] Connection to #{@options[:addresses][0]}:#{@options[:port]} failed: #{e}") unless @logger.nil? - rescue ShutdownSignal - # Just shutdown - 0 - rescue StandardError, NativeException => e - @logger.warn("[LogCourierClient] Unknown connection failure to #{@options[:addresses][0]}:#{@options[:port]}: #{e}") unless @logger.nil? - @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? - raise e + # Verify certificate + socket.post_connection_check(address) + + # Add extra logging data now we're connected + @logger['address'] = address + @logger['port'] = port + + @logger.info 'Connected successfully' unless @logger.nil? + return + rescue OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET => e + @logger.warn 'Connection failed', :error => e.message, :address => address, :port => port unless @logger.nil? + return + rescue ShutdownSignal + return + rescue StandardError, NativeException => e + @logger.warn e, :hint => 'Unknown connection failure', :address => address, :port => port unless @logger.nil? + raise e + end end end end diff --git a/lib/log-courier/event_queue.rb b/lib/log-courier/event_queue.rb index ae16342e..c65fe9c4 100644 --- a/lib/log-courier/event_queue.rb +++ b/lib/log-courier/event_queue.rb @@ -33,7 +33,7 @@ class EventQueue # Creates a fixed-length queue with a maximum size of +max+. # def initialize(max) - raise ArgumentError, "queue size must be positive" unless max > 0 + fail ArgumentError, "queue size must be positive" unless max > 0 @max = max @enque_cond = ConditionVariable.new @num_enqueue_waiting = 0 @@ -44,20 +44,19 @@ def initialize(max) self.taint @mutex = Mutex.new @cond = ConditionVariable.new + return end # # Returns the maximum size of the queue. # - def max - @max - end + attr_reader :max # # Sets the maximum size of the queue. # def max=(max) - raise ArgumentError, "queue size must be positive" unless max > 0 + fail ArgumentError, "queue size must be positive" unless max > 0 @mutex.synchronize do if max <= @max @@ -90,7 +89,7 @@ def push(obj, timeout = nil) ensure @num_enqueue_waiting -= 1 end - raise TimeoutError if !timeout.nil? and Time.now - start >= timeout + fail TimeoutError if !timeout.nil? and Time.now - start >= timeout end @que.push obj @@ -113,7 +112,7 @@ def push(obj, timeout = nil) # Retrieves data from the queue and runs a waiting thread, if any. # def pop(*args) - retval = _pop_timeout *args + retval = pop_timeout *args @mutex.synchronize do if @que.length < @max @enque_cond.signal @@ -122,31 +121,6 @@ def pop(*args) retval end - # - # Retrieves data from the queue. If the queue is empty, the calling thread is - # suspended until data is pushed onto the queue or, if set, +timeout+ seconds - # passes. If +timeout+ is 0, the thread isn't suspended, and an exception is - # raised. - # - def _pop_timeout(timeout = nil) - unless timeout.nil? - start = Time.now - end - @mutex.synchronize do - loop do - return @que.shift unless @que.empty? - raise TimeoutError if timeout == 0 - begin - @num_waiting += 1 - @cond.wait @mutex, timeout - ensure - @num_waiting -= 1 - end - raise TimeoutError if !timeout.nil? and Time.now - start >= timeout - end - end - end - # # Alias of pop # @@ -190,5 +164,33 @@ def length def num_waiting @num_waiting + @num_enqueue_waiting end + + private + + # + # Retrieves data from the queue. If the queue is empty, the calling thread is + # suspended until data is pushed onto the queue or, if set, +timeout+ seconds + # passes. If +timeout+ is 0, the thread isn't suspended, and an exception is + # raised. + # + def pop_timeout(timeout = nil) + unless timeout.nil? + start = Time.now + end + @mutex.synchronize do + loop do + return @que.shift unless @que.empty? + fail TimeoutError if timeout == 0 + begin + @num_waiting += 1 + @cond.wait @mutex, timeout + ensure + @num_waiting -= 1 + end + fail TimeoutError if !timeout.nil? and Time.now - start >= timeout + end + end + return + end end end diff --git a/lib/log-courier/server.rb b/lib/log-courier/server.rb index 9c2d2036..04d91272 100644 --- a/lib/log-courier/server.rb +++ b/lib/log-courier/server.rb @@ -40,6 +40,7 @@ def initialize(options = {}) }.merge!(options) @logger = @options[:logger] + @logger['plugin'] = 'input/courier' case @options[:transport] when 'tcp', 'tls' @@ -49,11 +50,12 @@ def initialize(options = {}) require 'log-courier/server_zmq' @server = ServerZmq.new(@options) else - raise '[LogCourierServer] \'transport\' must be tcp, tls, plainzmq or zmq' + fail 'input/courier: \'transport\' must be tcp, tls, plainzmq or zmq' end - # Grab the port back + # Grab the port back and update the logger context @port = @server.port + @logger['port'] = @port unless @logger.nil? # Load the json adapter @json_adapter = MultiJson.adapter.instance @@ -75,7 +77,11 @@ def run(&block) when 'JDAT' process_jdat message, comm, event_queue else - @logger.warn("[LogCourierServer] Unknown message received from #{comm.peer}") unless @logger.nil? + if comm.peer.nil? + @logger.warn 'Unknown message received', :from => 'unknown' unless @logger.nil? + else + @logger.warn 'Unknown message received', :from => comm.peer unless @logger.nil? + end # Don't kill a client that sends a bad message # Just reject it and let it send it again, potentially to another server comm.send '????', '' @@ -93,17 +99,21 @@ def run(&block) server_thread.join end end + return end + private + def process_ping(message, comm) # Size of message should be 0 if message.length != 0 - raise ProtocolError, "unexpected data attached to ping message (#{message.length})" + fail ProtocolError, "unexpected data attached to ping message (#{message.length})" end # PONG! # NOTE: comm.send can raise a Timeout::Error of its own comm.send 'PONG', '' + return end def process_jdat(message, comm, event_queue) @@ -114,11 +124,17 @@ def process_jdat(message, comm, event_queue) # This allows the client to know what is being acknowledged # Nonce is 16 so check we have enough if message.length < 17 - raise ProtocolError, "JDAT message too small (#{message.length})" + fail ProtocolError, "JDAT message too small (#{message.length})" end nonce = message[0...16] + if !@logger.nil? && @logger.debug? + nonce_str = nonce.each_byte.map do |b| + b.to_s(16).rjust(2, '0') + end + end + # The remainder of the message is the compressed data block message = StringIO.new Zlib::Inflate.inflate(message[16...message.length]) @@ -136,7 +152,7 @@ def process_jdat(message, comm, event_queue) # Finished! break elsif length_buf.length < 4 - raise ProtocolError, "JDAT length extraction failed (#{ret} #{length_buf.length})" + fail ProtocolError, "JDAT length extraction failed (#{ret} #{length_buf.length})" end length = length_buf.unpack('N').first @@ -145,7 +161,7 @@ def process_jdat(message, comm, event_queue) ret = message.read length, data_buf if ret.nil? or data_buf.length < length @logger.warn() - raise ProtocolError, "JDAT message extraction failed #{ret} #{data_buf.length}" + fail ProtocolError, "JDAT message extraction failed #{ret} #{data_buf.length}" end data_buf.force_encoding('utf-8') @@ -161,7 +177,7 @@ def process_jdat(message, comm, event_queue) begin event = @json_adapter.load(data_buf, @json_options) rescue MultiJson::ParseError => e - @logger.warn("[LogCourierServer] JSON parse failure, falling back to plain-text: #{e}") unless @logger.nil? + @logger.warn e, :hint => 'JSON parse failure, falling back to plain-text' unless @logger.nil? event = { 'message' => data_buf } end @@ -171,7 +187,7 @@ def process_jdat(message, comm, event_queue) rescue TimeoutError # Full pipeline, partial ack # NOTE: comm.send can raise a Timeout::Error of its own - @logger.debug "[LogCourierServer] Partially acknowledging message #{nonce.hash} sequence #{sequence}" unless @logger.nil? + @logger.debug 'Partially acknowledging message', :nonce => nonce_str.join, :sequence => sequence if !@logger.nil? && @logger.debug? comm.send 'ACKN', [nonce, sequence].pack('A*N') ack_timeout = Time.now.to_i + 5 retry @@ -182,8 +198,9 @@ def process_jdat(message, comm, event_queue) # Acknowledge the full message # NOTE: comm.send can raise a Timeout::Error - @logger.debug "[LogCourierServer] Acknowledging message #{nonce.hash} sequence #{sequence}" unless @logger.nil? + @logger.debug 'Acknowledging message', :nonce => nonce_str.join, :sequence => sequence if !@logger.nil? && @logger.debug? comm.send 'ACKN', [nonce, sequence].pack('A*N') + return end end end diff --git a/lib/log-courier/server_tcp.rb b/lib/log-courier/server_tcp.rb index ec6f5340..8b44225b 100644 --- a/lib/log-courier/server_tcp.rb +++ b/lib/log-courier/server_tcp.rb @@ -24,13 +24,25 @@ module LogCourier # Wrap around TCPServer to grab last error for use in reporting which peer had an error class ExtendedTCPServer < TCPServer - # Yield the peer + attr_reader :peer + + def initialise + reset_peer + super + end + + # Save the peer def accept sock = super peer = sock.peeraddr(:numeric) - Thread.current['LogCourierPeer'] = "#{peer[2]}:#{peer[1]}" + @peer = "#{peer[2]}:#{peer[1]}" return sock end + + def reset_peer + @peer = 'unknown' + return + end end # TLS transport implementation for server @@ -57,11 +69,11 @@ def initialize(options = {}) if @options[:transport] == 'tls' [:ssl_certificate, :ssl_key].each do |k| - raise "[LogCourierServer] '#{k}' is required" if @options[k].nil? + fail "input/courier: '#{k}' is required" if @options[k].nil? end if @options[:ssl_verify] and (!@options[:ssl_verify_default_ca] && @options[:ssl_verify_ca].nil?) - raise '[LogCourierServer] Either \'ssl_verify_default_ca\' or \'ssl_verify_ca\' must be specified when ssl_verify is true' + fail 'input/courier: Either \'ssl_verify_default_ca\' or \'ssl_verify_ca\' must be specified when ssl_verify is true' end end @@ -94,16 +106,18 @@ def initialize(options = {}) ssl.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end + # Create the OpenSSL server - set start_immediately to false so we can multithread handshake @server = OpenSSL::SSL::SSLServer.new(@tcp_server, ssl) + @server.start_immediately = false else @server = @tcp_server end if @options[:port] == 0 - @logger.warn '[LogCourierServer] Transport ' + @options[:transport] + ' is listening on ephemeral port ' + @port.to_s unless @logger.nil? + @logger.warn 'Ephemeral port allocated', :transport => @options[:transport], :port => @port unless @logger.nil? end rescue => e - raise "[LogCourierServer] Failed to initialise: #{e}" + raise "input/courier: Failed to initialise: #{e}" end end # def initialize @@ -111,20 +125,20 @@ def run(&block) client_threads = {} loop do - # This means ssl accepting is single-threaded. + # Because start_immediately is false, TCP accept is single thread but + # handshake is essentiall multithreaded as we defer it to the thread + @tcp_server.reset_peer + client = nil begin client = @server.accept rescue EOFError, OpenSSL::SSL::SSLError, IOError => e - # Handshake failure or other issue - peer = Thread.current['LogCourierPeer'] || 'unknown' - @logger.warn "[LogCourierServer] Connection from #{peer} failed to initialise: #{e}" unless @logger.nil? - client.close rescue nil + # Accept failure or other issue + @logger.warn 'Connection failed to accept', :error => e.message, :peer => @tcp_server.peer unless @logger.nil + client.close rescue nil unless client.nil? next end - peer = Thread.current['LogCourierPeer'] || 'unknown' - - @logger.info "[LogCourierServer] New connection from #{peer}" unless @logger.nil? + @logger.info 'New connection', :peer => @tcp_server.peer unless @logger.nil? # Clear up finished threads client_threads.delete_if do |_, thr| @@ -132,17 +146,16 @@ def run(&block) end # Start a new connection thread - client_threads[client] = Thread.new(client, peer) do |client_copy, peer_copy| - ConnectionTcp.new(@logger, client_copy, peer_copy, @options).run(&block) + client_threads[client] = Thread.new(client, @tcp_server.peer) do |client_copy, peer_copy| + run_thread client_copy, peer_copy, &block end end + return rescue ShutdownSignal - # Capture shutting down signal - 0 + return rescue StandardError, NativeException => e # Some other unknown problem - @logger.warn("[LogCourierServer] Unknown error: #{e}") unless @logger.nil? - @logger.warn("[LogCourierServer] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? + @logger.warn e, :hint => 'Unknown error, shutting down' unless @logger.nil? raise e ensure # Raise shutdown in all client threads and join then @@ -154,6 +167,24 @@ def run(&block) @tcp_server.close end + + private + + def run_thread(client, peer, &block) + # Perform the handshake inside the new thread so we don't block TCP accept + if @options[:transport] == 'tls' + begin + client.accept + rescue EOFError, OpenSSL::SSL::SSLError, IOError => e + # Handshake failure or other issue + @logger.warn 'Connection failed to initialise', :error => e.message, :peer => peer unless @logger.nil? + client.close + return + end + end + + ConnectionTcp.new(@logger, client, peer, @options).run(&block) + end end # Representation of a single connected client @@ -179,7 +210,7 @@ def run # Sanity if length > @options[:max_packet_size] - raise ProtocolError, "packet too large (#{length} > #{@options[:max_packet_size]})" + fail ProtocolError, "packet too large (#{length} > #{@options[:max_packet_size]})" end # While we're processing, EOF is bad as it may occur during send @@ -198,32 +229,61 @@ def run # If we EOF next it's a graceful close @in_progress = false end + return rescue TimeoutError # Timeout of the connection, we were idle too long without a ping/pong - @logger.warn("[LogCourierServer] Connection from #{@peer} timed out") unless @logger.nil? + @logger.warn 'Connection timed out', :peer => @peer unless @logger.nil? + return rescue EOFError if @in_progress - @logger.warn("[LogCourierServer] Premature connection close on connection from #{@peer}") unless @logger.nil? + @logger.warn 'Unexpected EOF', :peer => @peer unless @logger.nil? else - @logger.info("[LogCourierServer] Connection from #{@peer} closed") unless @logger.nil? + @logger.info 'Connection closed', :peer => @peer unless @logger.nil? end + return rescue OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET => e # Read errors, only action is to shutdown which we'll do in ensure - @logger.warn("[LogCourierServer] SSL error on connection from #{@peer}: #{e}") unless @logger.nil? + @logger.warn 'SSL error, connection aborted', :error => e.message, :peer => @peer unless @logger.nil? + return rescue ProtocolError => e # Connection abort request due to a protocol error - @logger.warn("[LogCourierServer] Protocol error on connection from #{@peer}: #{e}") unless @logger.nil? + @logger.warn 'Protocol error, connection aborted', :error => e.message, :peer => @peer unless @logger.nil? + return rescue ShutdownSignal # Shutting down - @logger.warn("[LogCourierServer] Closing connecting from #{@peer}: server shutting down") unless @logger.nil? - rescue => e + @logger.info 'Server shutting down, closing connection', :peer => @peer unless @logger.nil? + return + rescue StandardError, NativeException => e # Some other unknown problem - @logger.warn("[LogCourierServer] Unknown error on connection from #{@peer}: #{e}") unless @logger.nil? - @logger.warn("[LogCourierServer] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? + @logger.warn e, :hint => 'Unknown error, connection aborted', :peer => @peer unless @logger.nil? + return ensure @fd.close rescue nil end + def send(signature, message) + reset_timeout + data = signature + [message.length].pack('N') + message + done = 0 + loop do + begin + written = @fd.write_nonblock(data[done...data.length]) + rescue IO::WaitReadable + fail TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? + retry + rescue IO::WaitWritable + fail TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? + retry + end + fail ProtocolError, "write failure (#{done}/#{data.length})" if written == 0 + done += written + break if done >= data.length + end + return + end + + private + def recv(need) reset_timeout have = '' @@ -231,16 +291,16 @@ def recv(need) begin buffer = @fd.read_nonblock need - have.length rescue IO::WaitReadable - raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? + fail TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? retry rescue IO::WaitWritable - raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? + fail TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? retry end if buffer.nil? - raise EOFError + fail EOFError elsif buffer.length == 0 - raise ProtocolError, "read failure (#{have.length}/#{need})" + fail ProtocolError, "read failure (#{have.length}/#{need})" end if have.length == 0 have = buffer @@ -252,29 +312,10 @@ def recv(need) have end - def send(signature, message) - reset_timeout - data = signature + [message.length].pack('N') + message - done = 0 - loop do - begin - written = @fd.write_nonblock(data[done...data.length]) - rescue IO::WaitReadable - raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil? - retry - rescue IO::WaitWritable - raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil? - retry - end - raise ProtocolError, "write failure (#{done}/#{data.length})" if written == 0 - done += written - break if done >= data.length - end - end - def reset_timeout # TODO: Make configurable @timeout = Time.now.to_i + 1_800 + return end end end diff --git a/lib/log-courier/server_zmq.rb b/lib/log-courier/server_zmq.rb index 36b8569f..3a3f20d8 100644 --- a/lib/log-courier/server_zmq.rb +++ b/lib/log-courier/server_zmq.rb @@ -14,20 +14,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -begin - require 'ffi-rzmq-core' - require 'ffi-rzmq-core/version' - require 'ffi-rzmq' - require 'ffi-rzmq/version' -rescue LoadError => e - raise "[LogCourierServer] Could not initialise: #{e}" -end +require 'thread' +require 'log-courier/zmq_qpoll' module LogCourier # ZMQ transport implementation for the server class ServerZmq class ZMQError < StandardError; end + class << self + @print_zmq_versions = false + + def print_zmq_versions(logger) + return if @print_zmq_versions || logger.nil? + + libversion = LibZMQ.version + libversion = "#{libversion[:major]}.#{libversion[:minor]}.#{libversion[:patch]}" + + logger.info 'libzmq', :version => libversion + logger.info 'ffi-rzmq-core', :version => LibZMQ::VERSION + logger.info 'ffi-rzmq', :version => ZMQ.version + + @print_zmq_versions = true + end + end + attr_reader :port def initialize(options = {}) @@ -38,19 +49,19 @@ def initialize(options = {}) address: '0.0.0.0', curve_secret_key: nil, max_packet_size: 10_485_760, + peer_recv_queue: 10, }.merge!(options) @logger = @options[:logger] - libversion = LibZMQ.version - libversion = "#{libversion[:major]}.#{libversion[:minor]}.#{libversion[:patch]}" + self.class.print_zmq_versions @logger if @options[:transport] == 'zmq' - raise "[LogCourierServer] Transport 'zmq' requires libzmq version >= 4 (the current version is #{libversion})" unless LibZMQ.version4? + fail "input/courier: Transport 'zmq' requires libzmq version >= 4" unless LibZMQ.version4? - raise '[LogCourierServer] \'curve_secret_key\' is required' if @options[:curve_secret_key].nil? + fail 'input/courier: \'curve_secret_key\' is required' if @options[:curve_secret_key].nil? - raise '[LogCourierServer] \'curve_secret_key\' must be a valid 40 character Z85 encoded string' if @options[:curve_secret_key].length != 40 || !z85validate(@options[:curve_secret_key]) + fail 'input/courier: \'curve_secret_key\' must be a valid 40 character Z85 encoded string' if @options[:curve_secret_key].length != 40 || !z85validate(@options[:curve_secret_key]) end begin @@ -60,128 +71,306 @@ def initialize(options = {}) if @options[:transport] == 'zmq' rc = @socket.setsockopt(ZMQ::CURVE_SERVER, 1) - raise ZMQError, 'setsockopt CURVE_SERVER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) + fail 'setsockopt CURVE_SERVER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) rc = @socket.setsockopt(ZMQ::CURVE_SECRETKEY, @options[:curve_secret_key]) - raise ZMQError, 'setsockopt CURVE_SECRETKEY failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) + fail 'setsockopt CURVE_SECRETKEY failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) end bind = 'tcp://' + @options[:address] + (@options[:port] == 0 ? ':*' : ':' + @options[:port].to_s) rc = @socket.bind(bind) - raise ZMQError, 'failed to bind at ' + bind + ': ' + rZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) + fail 'failed to bind at ' + bind + ': ' + rZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) # Lookup port number that was allocated in case it was set to 0 endpoint = '' rc = @socket.getsockopt(ZMQ::LAST_ENDPOINT, endpoint) - raise ZMQError, 'getsockopt LAST_ENDPOINT failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) && %r{\Atcp://(?:.*):(?\d+)\0\z} =~ endpoint + fail 'getsockopt LAST_ENDPOINT failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) && %r{\Atcp://(?:.*):(?\d+)\0\z} =~ endpoint @port = endpoint_port.to_i - @poller = ZMQ::Poller.new - if @options[:port] == 0 - @logger.warn '[LogCourierServer] Transport ' + @options[:transport] + ' is listening on ephemeral port ' + @port.to_s unless @logger.nil? + @logger.warn 'Ephemeral port allocated', :transport => @options[:transport], :port => @port unless @logger.nil? end rescue => e - raise "[LogCourierServer] Failed to initialise: #{e}" + raise "input/courier: Failed to initialise: #{e}" end - @logger.info "[LogCourierServer] libzmq version #{libversion}" unless @logger.nil? - @logger.info "[LogCourierServer] ffi-rzmq-core version #{LibZMQ::VERSION}" unless @logger.nil? - @logger.info "[LogCourierServer] ffi-rzmq version #{ZMQ.version}" unless @logger.nil? - # TODO: Implement workers option by receiving on a ROUTER and proxying to a DEALER, with workers connecting to the DEALER - @return_route = [] + # TODO: Make this send queue configurable? + @send_queue = EventQueue.new 2 + @factory = ClientFactoryZmq.new(@options, @send_queue) + + # Setup poller + @poller = ZMQPoll::ZMQPoll.new(@context) + @poller.register_socket @socket, ZMQ::POLLIN + @poller.register_queue_to_socket @send_queue, @socket + + # Register a finaliser that sets @context to nil + # This allows us to detect the JRuby bug where during "exit!" finalisers + # are run but threads are not killed - which leaves us in a situation of + # a terminated @context (it has a terminate finalizer) and an IO thread + # looping retries + # JRuby will still crash and burn, but at least we don't spam STDOUT with + # errors + ObjectSpace.define_finalizer(self, Proc.new do + @context = nil + end) + end + + def run(&block) + loop do + begin + @poller.poll(5_000) do |socket, r, w| + next if socket != @socket + next if !r - reset_timeout + receive &block + end + rescue ZMQPoll::ZMQError => e + # Detect JRuby bug + fail e if @context.nil? + @logger.warn e, :hint => 'ZMQ recv_string failure' unless @logger.nil? + next + rescue ZMQPoll::TimeoutError + # We'll let ZeroMQ manage reconnections and new connections + # There is no point in us doing any form of reconnect ourselves + next + end + end + return + rescue ShutdownSignal + # Shutting down + @logger.warn 'Server shutting down' unless @logger.nil? + return + rescue StandardError, NativeException => e + # Some other unknown problem + @logger.warn e, :hint => 'Unknown error, shutting down' unless @logger.nil? + raise e + ensure + @poller.shutdown + @factory.shutdown + @socket.close + @context.terminate end + private + def z85validate(z85) # ffi-rzmq does not implement decode - but we want to validate during startup decoded = FFI::MemoryPointer.from_string(' ' * (8 * z85.length / 10)) ret = LibZMQ.zmq_z85_decode decoded, z85 return false if ret.nil? - true end - def run(&block) - loop do - begin - begin - # Try to receive a message - reset_timeout - data = [] - rc = @socket.recv_strings(data, ZMQ::DONTWAIT) - unless ZMQ::Util.resultcode_ok?(rc) - raise ZMQError, 'recv_string error: ' + ZMQ::Util.error_string if ZMQ::Util.errno != ZMQ::EAGAIN - - # Wait for a message to arrive, handling timeouts - @poller.deregister @socket, ZMQ::POLLIN | ZMQ::POLLOUT - @poller.register @socket, ZMQ::POLLIN - while @poller.poll(1_000) == 0 - # Using this inner while triggers pollThreadEvents in JRuby which checks for Thread.raise immediately - raise TimeoutError while Time.now.to_i >= @timeout - end - next - end - rescue ZMQError => e - @logger.warn "[LogCourierServer] ZMQ recv_string failed: #{e}" unless @logger.nil? - next - end + def receive(&block) + # Try to receive a message + data = [] + rc = @socket.recv_strings(data, ZMQ::DONTWAIT) + unless ZMQ::Util.resultcode_ok?(rc) + fail ZMQError, 'recv_string error: ' + ZMQ::Util.error_string if ZMQ::Util.errno != ZMQ::EAGAIN + end - # Save the routing information that appears before the null messages - @return_route = [] - @return_route.push data.shift until data.length == 0 || data[0] == '' + # Save the source information that appears before the null messages + source = [] + source.push data.shift until data.length == 0 || data[0] == '' - if data.length == 0 - @logger.warn '[LogCourierServer] Invalid message: no data' unless @logger.nil? - next - elsif data.length == 1 - @logger.warn '[LogCourierServer] Invalid message: empty data' unless @logger.nil? - next + if data.length == 0 + @logger.warn 'Invalid message: no data', :source_length => source.length unless @logger.nil? + return + elsif data.length == 1 + @logger.warn 'Invalid message: empty data', :source_length => source.length unless @logger.nil? + return + end + + # Drop the null message separator + data.shift + + if data.length != 1 + @logger.warn 'Invalid message: multipart unexpected', :source_length => source.length, :data_length => data.length unless @logger.nil? + if !@logger.nil? && @logger.debug? + i = 0 + parts = {} + data.each do |msg| + i += 1 + parts[i] = "#{part.length}:[#{msg[0..31].gsub(/[^[:print:]]/, '.')}]" end + @logger.debug 'Data', parts + end + return + end + + @factory.deliver source, data.first, &block + return + end + end + + class ClientFactoryZmq + attr_reader :options + attr_reader :send_queue + + def initialize(options, send_queue) + @options = options + @logger = @options[:logger] + + @send_queue = send_queue + @index = {} + @client_threads = {} + @mutex = Mutex.new + end + + def shutdown + # Stop other threads from try_drop collisions + client_threads = @mutex.synchronize do + client_threads = @client_threads + @client_threads = {} + client_threads + end + + client_threads.each_value do |thr| + thr.raise ShutdownSignal + end + + client_threads.each_value(&:join) + return + end - # Drop the null message separator - data.shift - - if data.length != 1 - @logger.warn "[LogCourierServer] Invalid message: multipart unexpected (#{data.length})" unless @logger.nil? - if !@logger.nil? && @logger.debug? - i = 0 - data.each do |msg| - i += 1 - part = msg[0..31].gsub(/[^[:print:]]/, '.') - @logger.debug "[LogCourierServer] Part #{i}: #{part.length}:[#{part}]" - end + def deliver(source, data, &block) + # Find the handling thread + # We separate each source into threads so that each thread can respond + # with partial ACKs if we hit a slow down + # If we processed in a single thread, we'd only be able to respond to + # a single client with partial ACKs + @mutex.synchronize do + index = @index + source.each do |identity| + index[identity] = {} if !index.key?(identity) + index = index[identity] + end + + if !index.key?('') + source_str = source.map do |s| + s.each_byte.map do |b| + b.to_s(16).rjust(2, '0') end - else - recv(data.first, &block) + end.join + + @logger.info 'New source', :source => source_str unless @logger.nil? + + # Create the client and associated thread + client = ClientZmq.new(self, source, source_str) do + try_drop(source) end + + thread = Thread.new do + client.run &block + end + + @client_threads[thread] = thread + + index[''] = { + 'client' => client, + 'thread' => thread, + } + end + + # Existing thread, throw on the queue, if not enough room drop the message + index['']['client'].push data, 0 + end + return + end + + private + + def try_drop(source, source_str) + # This is called when a client goes idle, to cleanup resources + # We may tie this into zmq monitor + @mutex.synchronize do + index = @index + parents = [] + source.each do |identity| + if !index.key?(identity) + @logger.warn 'Unknown idle source failed to shutdown', :source => source_str unless @logger.nil? + break + end + parents.push [index, identity] + index = index[identity] + end + + if !index.key?('') + @logger.warn 'Unknown idle source failed to shutdown', :source => source_str unless @logger.nil? + break + end + + # Don't allow drop if we have messages in the queue + if index['']['client'].length != 0 + @logger.warn 'Failed idle source shutdown as message queue is not empty', :source => source_str unless @logger.nil? + return false + end + + @logger.info 'Idle source shutting down', :source => source_str unless @logger.nil? + + # Delete the entry + @client_threads.delete(index['']['thread']) + index.delete('') + + # Cleanup orphaned leafs + parents.reverse_each do |path| + path[0].delete(path[1]) if path[0][path[1]].length == 0 + end + end + + return true + end + end + + class ClientZmq < EventQueue + def initialize(factory, source, source_str, &try_drop) + @factory = factory + @logger = @factory.options[:logger] + @send_queue = @factory.send_queue + @source = source + @source_str = source_str + @try_drop = try_drop + + # Setup the queue for receiving events to process + super @factory.options[:peer_recv_queue] + end + + def run(&block) + loop do + begin + # TODO: Make timeout configurable? + data = self.pop(30) + recv(data, &block) rescue TimeoutError - # We'll let ZeroMQ manage reconnections and new connections - # There is no point in us doing any form of reconnect ourselves - # We will keep this timeout in however, for shutdown checks - reset_timeout - next + # Try to clean up resources - if we fail, new messages have arrived + retry if !@try_drop.call(@source) + break end end + return rescue ShutdownSignal # Shutting down - @logger.warn('[LogCourierServer] Server shutting down') unless @logger.nil? + @logger.info 'Source shutting down', :source => @source_str unless @logger.nil? + return rescue StandardError, NativeException => e # Some other unknown problem - @logger.warn("[LogCourierServer] Unknown error: #{e}") unless @logger.nil? - @logger.warn("[LogCourierServer] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil? + @logger.warn e, :hint => 'Unknown error, connection aborted', :source => @source_str unless @logger.nil? raise e - ensure - @socket.close - @context.terminate end + def send(signature, message) + data = signature + [message.length].pack('N') + message + @send_queue.push @source + ['', data] + return + end + + private + def recv(data) if data.length < 8 - @logger.warn '[LogCourierServer] Invalid message: not enough data' unless @logger.nil? + @logger.warn 'Invalid message: not enough data', :data_length => data.length, :source => @source_str unless @logger.nil? return end @@ -190,52 +379,16 @@ def recv(data) # Verify length if data.length - 8 != length - @logger.warn "[LogCourierServer] Invalid message: data has invalid length (#{data.length - 8} != #{length})" unless @logger.nil? + @logger.warn 'Invalid message: data has invalid length', :data_length => data.length - 8, :encoded_length => length, :source => @source_str unless @logger.nil? return - elsif length > @options[:max_packet_size] - @logger.warn "[LogCourierServer] Invalid message: packet too large (#{length} > #{@options[:max_packet_size]})" unless @logger.nil? + elsif length > @factory.options[:max_packet_size] + @logger.warn 'Invalid message: packet too large', :size => length, :max_packet_size => @options[:max_packet_size], :source => @source_str unless @logger.nil? return end # Yield the parts yield signature, data[8, length], self - end - - def send(signature, message) - data = signature + [message.length].pack('N') + message - - # Send the return route and then the message - reset_timeout - @return_route.each do |msg| - send_with_poll msg, true - end - send_with_poll '', true - send_with_poll data - end - - def send_with_poll(data, more = false) - loop do - # Try to send a message but never block - rc = @socket.send_string(data, (more ? ZMQ::SNDMORE : 0) | ZMQ::DONTWAIT) - break if ZMQ::Util.resultcode_ok?(rc) - if ZMQ::Util.errno != ZMQ::EAGAIN - @logger.warn "[LogCourierServer] Message send failed: #{ZMQ::Util.error_string}" unless @logger.nil? - raise TimeoutError - end - - # Wait for send to become available, handling timeouts - @poller.deregister @socket, ZMQ::POLLIN | ZMQ::POLLOUT - @poller.register @socket, ZMQ::POLLOUT - while @poller.poll(1_000) == 0 - # Using this inner while triggers pollThreadEvents in JRuby which checks for Thread.raise immediately - raise TimeoutError while Time.now.to_i >= @timeout - end - end - end - - def reset_timeout - # TODO: Make configurable? - @timeout = Time.now.to_i + 1_800 + return end end end diff --git a/lib/log-courier/zmq_qpoll.rb b/lib/log-courier/zmq_qpoll.rb new file mode 100644 index 00000000..0e283f60 --- /dev/null +++ b/lib/log-courier/zmq_qpoll.rb @@ -0,0 +1,306 @@ +# encoding: utf-8 + +# Copyright 2014 Jason Woods. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +begin + require 'ffi-rzmq' + require 'ffi-rzmq/version' + require 'ffi-rzmq-core/version' +rescue LoadError => e + raise "ZMQPoll could not initialise: #{e}" +end + +module ZMQPoll + class ZMQError < StandardError; end + class TimeoutError < StandardError; end + + class ZMQPoll + def initialize(context, logger=nil) + @logger = logger + @context = context + @poller = ZMQ::Poller.new + @sockets = [] + @socket_to_socket = [] + @handlers = {} + @queues = {} + end + + def readables + @poller.readables + end + + def writables + @poller.writables + end + + def shutdown + @queues.each_key do |queue| + deregister_queue queue + end + + @socket_to_socket.each do |socket| + _close_socket_to_socket socket + end + + @sockets.each do |socket| + socket.close + end + return + end + + def register_socket(socket, flags) + @poller.register socket, flags + return + end + + def deregister_socket(socket) + return if @handlers.key?(socket) + + @poller.delete socket + return + end + + def register_queue_to_socket(queue, socket) + s2s_state = _create_socket_to_socket(socket) + + state = { + state: s2s_state, + mutex: Mutex.new, + shutdown: false, + } + + state[:thread] = Thread.new do + loop do + data = queue.pop + break if data.nil? + begin + send s2s_state[:sender], data + rescue TimeoutError + state[:mutex].synchronize do + break if state[:shutdown] + end + retry + end + end + end + + @queues[queue] = state + return + end + + def deregister_queue(queue) + return if !@queues.key?(queue) + + # Push nil so if we're idle we jump into action and exit + # But also set shutdown to try so if we're mid-send and timeout, we exit + @queues[queue][:mutex].synchronize do + queue.push nil + @queues[queue][:shutdown] = true + end + @queues[queue][:thread].join + + _close_socket_to_socket @queues[queue][:state] + @queues.delete queue + return + end + + def create_socket_to_socket(socket) + state = _create_socket_to_socket(socket) + @socket_to_socket[state[:sender]] = state + state[:sender] + end + + def close_socket_to_socket(socket) + return if !@socket_to_socket.include?(socket) + state = @socket_to_socket[socket] + @socket_to_socket.delete socket + _close_socket_to_socket(state) + return + end + + def poll(timeout) + if @poller.size == 0 + fail ZMQError, 'poll run called with zero socket/queues' + end + + rc = @poller.poll(timeout) + if rc == -1 + fail ZMQError, 'poll error: ' + ZMQ::Util.error_string + end + + return if rc == 0 + + ready = (@poller.readables|@poller.writables) + + ready.each do |socket| + if @handlers.key?(socket) + __send__ @handlers[socket][:callback], @handlers[socket] + end + + yield socket, @poller.readables.include?(socket), @poller.writables.include?(socket) + end + + return + end + + private + + def _create_socket_to_socket(socket) + receiver = @context.socket(ZMQ::PULL) + fail ZMQError, 'socket creation error: ' + ZMQ::Util.error_string if receiver.nil? + + rc = receiver.bind("inproc://zmqpollreceiver-#{receiver.hash}") + fail ZMQError, 'bind error: ' + ZMQ::Util.error_string if !ZMQ::Util.resultcode_ok?(rc) + + sender = @context.socket(ZMQ::PUSH) + fail ZMQError, 'socket creation error: ' + ZMQ::Util.error_string if sender.nil? + + rc = sender.connect("inproc://zmqpollreceiver-#{receiver.hash}") + fail ZMQError, 'bind error: ' + ZMQ::Util.error_string if !ZMQ::Util.resultcode_ok?(rc) + + state = { + :callback => :handle_socket_to_socket, + :sender => sender, + :receiver => receiver, + :socket => socket, + :buffer => nil, + :send_ok => false, + :recv_ok => false, + } + + @poller.register receiver, ZMQ::POLLIN + @poller.register socket, ZMQ::POLLOUT + @handlers[receiver] = state + @handlers[socket] = state + + @sockets.push sender + + state + end + + def _close_socket_to_socket(state) + @sockets.delete state[:sender] + + @poller.delete state[:receiver] + @poller.delete state[:socket] + + state[:sender].close + state[:receiver].close + + @handlers.delete state[:receiver] + @handlers.delete state[:socket] + + return + end + + def handle_socket_to_socket(state) + state[:recv_ok] = @poller.readables.include?(state[:receiver]) || state[:recv_ok] + state[:send_ok] = @poller.writables.include?(state[:socket]) || state[:send_ok] + + loop do + if state[:send_ok] && !state[:buffer].nil? + begin + send state[:socket], state[:buffer] + rescue TimeoutError + end + state[:buffer] = nil if state[:buffer].length == 0 + state[:send_ok] = false + end + + break if !state[:recv_ok] + + if state[:recv_ok] && state[:buffer].nil? + begin + state[:buffer] = recv(state[:receiver]) + rescue TimeoutError + end + state[:recv_ok] = false + end + + break if !state[:send_ok] + end + + if state[:recv_ok] + @poller.deregister state[:receiver], ZMQ::POLLIN + else + @poller.register state[:receiver], ZMQ::POLLIN + end + + if state[:send_ok] + @poller.deregister state[:socket], ZMQ::POLLOUT + else + @poller.register state[:socket], ZMQ::POLLOUT + end + + return + end + + def recv(socket) + data = [] + + poll_eagain(socket, ZMQ::POLLIN, 5) do + # recv_strings appears to be safe, ZMQ documents that a client will either + # receive 0 parts or all parts + socket.recv_strings(data, ZMQ::DONTWAIT) + end + + data + end + + def send(socket, data) + while data.length != 1 + send_part socket, data.shift, true + end + send_part socket, data.shift + return + end + + def send_part(socket, data, more=false) + poll_eagain(socket, ZMQ::POLLOUT, 5) do + # Try to send a message but never block + # We could use send_strings but it is vague on if ZMQ can return an + # error midway through sending parts... + socket.send_string(data, (more ? ZMQ::SNDMORE : 0) | ZMQ::DONTWAIT) + end + + return + end + + def poll_eagain(socket, flag, timeout, &block) + poller = nil + timeout = Time.now.to_i + timeout + loop do + rc = block.call() + break if ZMQ::Util.resultcode_ok?(rc) + if ZMQ::Util.errno != ZMQ::EAGAIN + fail ZMQError, 'message receive failed: ' + ZMQ::Util.error_string if flag == ZMQ::POLLIN + fail ZMQError, 'message send failed: ' + ZMQ::Util.error_string + end + + # Wait for send to become available, handling timeouts + if poller.nil? + poller = ZMQ::Poller.new + poller.register socket, flag + end + + while poller.poll(1_000) == 0 + # Using this inner while triggers pollThreadEvents in JRuby which checks for Thread.raise immediately + fail TimeoutError while Time.now.to_i >= timeout + end + end + return + end + end +end diff --git a/lib/logstash/inputs/courier.rb b/lib/logstash/inputs/courier.rb index 0a5a505f..a5f935ef 100644 --- a/lib/logstash/inputs/courier.rb +++ b/lib/logstash/inputs/courier.rb @@ -59,6 +59,9 @@ class Courier < LogStash::Inputs::Base # Max packet size config :max_packet_size, :validate => :number + # Peer recv queue + config :peer_recv_queue, :validate => :number + public def register @@ -75,10 +78,11 @@ def register ssl_verify: @ssl_verify, ssl_verify_default_ca: @ssl_verify_default_ca, ssl_verify_ca: @ssl_verify_ca, - curve_secret_key: @curve_secret_key + curve_secret_key: @curve_secret_key, } options[:max_packet_size] = @max_packet_size unless @max_packet_size.nil? + options[:peer_recv_queue] = @peer_recv_queue unless @peer_recv_queue.nil? require 'log-courier/server' @log_courier = LogCourier::Server.new options diff --git a/log-courier.gemspec b/log-courier.gemspec index 339d6b91..c5ecb0c9 100644 --- a/log-courier.gemspec +++ b/log-courier.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |gem| gem.name = 'log-courier' - gem.version = '1.1' + gem.version = '1.2' gem.description = 'Log Courier library' gem.summary = 'Receive events from Log Courier and transmit between LogStash instances' gem.homepage = 'https://github.com/driskell/log-courier' @@ -18,6 +18,7 @@ Gem::Specification.new do |gem| lib/log-courier/server_zmq.rb ) - gem.add_runtime_dependency 'ffi-rzmq', '~> 2.0' - gem.add_runtime_dependency 'multi_json', '~> 1.0' + gem.add_runtime_dependency 'cabin', '~> 0.6' + gem.add_runtime_dependency 'ffi-rzmq', '~> 2.0' + gem.add_runtime_dependency 'multi_json', '~> 1.10' end diff --git a/logstash-input-log-courier.gemspec b/logstash-input-log-courier.gemspec index 3fe867ed..1e0d7143 100644 --- a/logstash-input-log-courier.gemspec +++ b/logstash-input-log-courier.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |gem| gem.name = 'logstash-input-log-courier' - gem.version = '1.1' + gem.version = '1.2' gem.description = 'Log Courier Input Logstash Plugin' gem.summary = 'Receive events from Log Courier and Logstash using the Log Courier protocol' gem.homepage = 'https://github.com/driskell/log-courier' @@ -16,5 +16,5 @@ Gem::Specification.new do |gem| gem.metadata = { 'logstash_plugin' => 'true', 'group' => 'input' } gem.add_runtime_dependency 'logstash', '~> 1.4' - gem.add_runtime_dependency 'log-courier', '= 1.1' + gem.add_runtime_dependency 'log-courier', '= 1.2' end diff --git a/logstash-output-log-courier.gemspec b/logstash-output-log-courier.gemspec index 194f48d0..c4022079 100644 --- a/logstash-output-log-courier.gemspec +++ b/logstash-output-log-courier.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |gem| gem.name = 'logstash-output-log-courier' - gem.version = '1.1' + gem.version = '1.2' gem.description = 'Log Courier Output Logstash Plugin' gem.summary = 'Transmit events from one Logstash instance to another using the Log Courier protocol' gem.homepage = 'https://github.com/driskell/log-courier' @@ -16,5 +16,5 @@ Gem::Specification.new do |gem| gem.metadata = { 'logstash_plugin' => 'true', 'group' => 'input' } gem.add_runtime_dependency 'logstash', '~> 1.4' - gem.add_runtime_dependency 'log-courier', '= 1.1' + gem.add_runtime_dependency 'log-courier', '= 1.2' end diff --git a/spec/benchmark_spec.rb b/spec/benchmark_spec.rb index e5d809f3..0746b3f9 100644 --- a/spec/benchmark_spec.rb +++ b/spec/benchmark_spec.rb @@ -57,6 +57,11 @@ startup verbose: false, args: '-from-beginning=true', config: <<-config { + "general": { + "admin enabled": true, + "admin listen address": "tcp:127.0.0.1:12350", + "log level": "debug" + }, "network": { "ssl ca": "#{@ssl_cert.path}", "servers": [ "127.0.0.1:#{server_port}" ] diff --git a/spec/courier_spec.rb b/spec/courier_spec.rb index 5bf23576..17291f79 100644 --- a/spec/courier_spec.rb +++ b/spec/courier_spec.rb @@ -27,7 +27,7 @@ { "network": { "ssl ca": "#{@ssl_cert.path}", - "servers": [ "127.0.0.1:#{server_port}" ] + "servers": [ "localhost:#{server_port}" ] }, "files": [ { diff --git a/spec/gem_spec.rb b/spec/gem_spec.rb index ecb422bb..a642749a 100644 --- a/spec/gem_spec.rb +++ b/spec/gem_spec.rb @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'logger' +require 'cabin' require 'timeout' require 'lib/common' @@ -28,8 +28,9 @@ end def startup - logger = Logger.new(STDOUT) - logger.level = Logger::DEBUG + logger = Cabin::Channel.new + logger.subscribe STDOUT + logger.level = :debug # Reset server for each test @client = LogCourier::Client.new( diff --git a/spec/lib/helpers/common.rb b/spec/lib/helpers/common.rb index 3f7342d0..eab265ca 100644 --- a/spec/lib/helpers/common.rb +++ b/spec/lib/helpers/common.rb @@ -15,7 +15,7 @@ # limitations under the License. require 'thread' -require 'logger' +require 'cabin' require 'log-courier/server' # Common helpers for testing both ruby client and the courier @@ -73,9 +73,10 @@ def start_server(args = {}) id = args[:id] - logger = Logger.new(STDOUT) - logger.progname = "Server #{id}" - logger.level = Logger::DEBUG + logger = Cabin::Channel.new + logger.subscribe STDOUT + logger['instance'] = id + logger.level = :debug raise 'Server already initialised' if @servers.key?(id) diff --git a/spec/lib/helpers/log-courier.rb b/spec/lib/helpers/log-courier.rb index d7087844..12a4f089 100644 --- a/spec/lib/helpers/log-courier.rb +++ b/spec/lib/helpers/log-courier.rb @@ -88,9 +88,9 @@ def startup(args = {}) loop do line = log_courier.gets break if line.nil? - puts 'SO: ' + line + puts ':' + line end - puts 'SO- END' + puts ':END' end # Needs some time to startup - to ensure when we create new files AFTER this, they are not detected during startup diff --git a/src/lc-admin/lc-admin.go b/src/lc-admin/lc-admin.go index edd7d613..e4d46c09 100644 --- a/src/lc-admin/lc-admin.go +++ b/src/lc-admin/lc-admin.go @@ -88,17 +88,14 @@ func (a *Admin) ProcessCommand(command string) bool { fmt.Printf("Configuration reload successful\n") case "status": - var snapshots []*core.Snapshot + var snaps *core.Snapshot - snapshots, err = a.client.FetchSnapshot() + snaps, err = a.client.FetchSnapshot() if err != nil { break } - for _, snap := range snapshots { - fmt.Printf("%s:\n", snap.Description()) - a.renderSnap(" ", snap) - } + a.renderSnap("", snaps) case "help": PrintHelp() default: @@ -135,6 +132,8 @@ func (a *Admin) renderSnap(indent string, snap *core.Snapshot) { fmt.Printf(indent + "%s: %.2f\n", k, t) } else if t, ok := v.(time.Time); ok { fmt.Printf(indent + "%s: %s\n", k, t.Format("_2 Jan 2006 15.04.05")) + } else if t, ok := v.(time.Duration); ok { + fmt.Printf(indent + "%s: %v\n", k, t-(t%time.Second)) } else { fmt.Printf(indent + "%s: %v\n", k, v) } diff --git a/src/lc-lib/admin/client.go b/src/lc-lib/admin/client.go index 3822fe1a..0109be0f 100644 --- a/src/lc-lib/admin/client.go +++ b/src/lc-lib/admin/client.go @@ -129,15 +129,25 @@ func (c *Client) Reload() error { return c.resolveError(response) } -func (c *Client) FetchSnapshot() ([]*core.Snapshot, error) { +func (c *Client) FetchSnapshot() (*core.Snapshot, error) { response, err := c.request("SNAP") if err != nil { return nil, err } - if ret, ok := response.Response.([]*core.Snapshot); ok { + if ret, ok := response.Response.(*core.Snapshot); ok { return ret, nil } + // Backwards compatibility + if ret, ok := response.Response.([]*core.Snapshot); ok { + snap := core.NewSnapshot("Log Courier") + for _, sub := range ret { + snap.AddSub(sub) + } + snap.Sort() + return snap, nil + } + return nil, c.resolveError(response) } diff --git a/src/lc-lib/admin/responses.go b/src/lc-lib/admin/responses.go index 56072735..5545b5f6 100644 --- a/src/lc-lib/admin/responses.go +++ b/src/lc-lib/admin/responses.go @@ -54,6 +54,9 @@ func init() { gob.Register(&ReloadResponse{}) // SNAP - gob.Register([]*core.Snapshot{&core.Snapshot{}}) + gob.Register(&core.Snapshot{}) + // SNAP - time.Time gob.Register(time.Now()) + // SNAP - time.Duration + gob.Register(time.Since(time.Now())) } diff --git a/src/lc-lib/core/pipeline.go b/src/lc-lib/core/pipeline.go index 88b900b0..480e4ddb 100644 --- a/src/lc-lib/core/pipeline.go +++ b/src/lc-lib/core/pipeline.go @@ -80,14 +80,16 @@ func (p *Pipeline) SendConfig(config *Config) { } } -func (p *Pipeline) Snapshot() []*Snapshot { - snapshots := make([]*Snapshot, 0) +func (p *Pipeline) Snapshot() *Snapshot { + snap := NewSnapshot("Log Courier") for _, sink := range p.snapshot_pipes { - snapshots = append(snapshots, sink.Snapshot()...) + for _, sub := range sink.Snapshot() { + snap.AddSub(sub) + } } - return snapshots + return snap } type IPipelineSegment interface { diff --git a/src/lc-lib/core/snapshot.go b/src/lc-lib/core/snapshot.go index 5b430f79..12551f41 100644 --- a/src/lc-lib/core/snapshot.go +++ b/src/lc-lib/core/snapshot.go @@ -16,11 +16,16 @@ package core +import ( + "sort" +) + type Snapshot struct { Desc string Entries map[string]interface{} Keys []string - Subs []*Snapshot + Subs map[string]*Snapshot + SubKeys []string } func NewSnapshot(desc string) *Snapshot { @@ -28,10 +33,16 @@ func NewSnapshot(desc string) *Snapshot { Desc: desc, Entries: make(map[string]interface{}), Keys: make([]string, 0), - Subs: make([]*Snapshot, 0), + Subs: make(map[string]*Snapshot), + SubKeys: make([]string, 0), } } +func (s *Snapshot) Sort() { + sort.Strings(s.Keys) + sort.Strings(s.SubKeys) +} + func (s *Snapshot) Description() string { return s.Desc } @@ -62,17 +73,19 @@ func (s *Snapshot) NumEntries() int { } func (s *Snapshot) AddSub(sub *Snapshot) { - s.Subs = append(s.Subs, sub) + desc := sub.Description() + s.Subs[desc] = sub + s.SubKeys = append(s.SubKeys, desc) } func (s *Snapshot) Sub(i int) *Snapshot { - if i < 0 || i >= len(s.Subs) { + if i < 0 || i >= len(s.SubKeys) { panic("Out of bounds") } - return s.Subs[i] + return s.Subs[s.SubKeys[i]] } func (s *Snapshot) NumSubs() int { - return len(s.Subs) + return len(s.SubKeys) } diff --git a/src/lc-lib/core/version.go b/src/lc-lib/core/version.go index 00d966cb..d0bab1eb 100644 --- a/src/lc-lib/core/version.go +++ b/src/lc-lib/core/version.go @@ -16,4 +16,4 @@ package core -const Log_Courier_Version string = "v1.1" +const Log_Courier_Version string = "1.2" diff --git a/src/lc-lib/harvester/harvester.go b/src/lc-lib/harvester/harvester.go index ac4517dc..80dac7a0 100644 --- a/src/lc-lib/harvester/harvester.go +++ b/src/lc-lib/harvester/harvester.go @@ -66,7 +66,8 @@ type Harvester struct { byte_speed float64 line_count uint64 byte_count uint64 - last_eof *int64 + last_eof_off *int64 + last_eof *time.Time } func NewHarvester(stream core.Stream, config *core.Config, fileconfig *core.FileConfig, offset int64) *Harvester { @@ -90,6 +91,7 @@ func NewHarvester(stream core.Stream, config *core.Config, fileconfig *core.File config: config, fileconfig: fileconfig, offset: offset, + last_eof: nil, } ret.codec = fileconfig.CodecFactory.NewCodec(ret.eventCallback, ret.offset) @@ -170,6 +172,8 @@ ReadLoop: h.codec.Meter() + h.last_eof = nil + h.Unlock() // Check shutdown @@ -223,8 +227,15 @@ ReadLoop: } h.Lock() - last_eof := h.offset - h.last_eof = &last_eof + if h.last_eof_off == nil { + h.last_eof_off = new(int64) + } + *h.last_eof_off = h.offset + + if h.last_eof == nil { + h.last_eof = new(time.Time) + } + *h.last_eof = last_read_time h.Unlock() // Don't check for truncation until we hit the full read_timeout @@ -385,10 +396,20 @@ func (h *Harvester) Snapshot() *core.Snapshot { ret.AddEntry("Speed (Bps)", h.byte_speed) ret.AddEntry("Processed lines", h.line_count) ret.AddEntry("Current offset", h.offset) + if h.last_eof_off == nil { + ret.AddEntry("Last EOF Offset", "Never") + } else { + ret.AddEntry("Last EOF Offset", h.last_eof_off) + } if h.last_eof == nil { - ret.AddEntry("Last EOF", "Never") + ret.AddEntry("Status", "Alive") } else { - ret.AddEntry("Last EOF", h.last_eof) + ret.AddEntry("Status", "Idle") + if age := time.Since(*h.last_eof); age < h.fileconfig.DeadTime { + ret.AddEntry("Dead timer", h.fileconfig.DeadTime-age) + } else { + ret.AddEntry("Dead timer", "0s") + } } if sub_snap := h.codec.Snapshot(); sub_snap != nil { diff --git a/src/lc-lib/publisher/pending_payload.go b/src/lc-lib/publisher/pending_payload.go index fb2f8b3f..bc545cfa 100644 --- a/src/lc-lib/publisher/pending_payload.go +++ b/src/lc-lib/publisher/pending_payload.go @@ -20,17 +20,23 @@ import ( "bytes" "compress/zlib" "encoding/binary" + "errors" "lc-lib/core" "time" ) +var ( + ErrPayloadCorrupt = errors.New("Payload is corrupt") +) + type pendingPayload struct { next *pendingPayload nonce string events []*core.EventDescriptor - num_events int + last_sequence int + sequence_len int ack_events int - payload_start int + processed int payload []byte timeout time.Time } @@ -39,7 +45,6 @@ func newPendingPayload(events []*core.EventDescriptor, nonce string, timeout tim payload := &pendingPayload{ events: events, nonce: nonce, - num_events: len(events), timeout: time.Now().Add(timeout), } @@ -53,6 +58,11 @@ func newPendingPayload(events []*core.EventDescriptor, nonce string, timeout tim func (pp *pendingPayload) Generate() (err error) { var buffer bytes.Buffer + // Assertion + if len(pp.events) == 0 { + return ErrPayloadCorrupt + } + // Begin with the nonce if _, err = buffer.Write([]byte(pp.nonce)[0:16]); err != nil { return @@ -76,7 +86,44 @@ func (pp *pendingPayload) Generate() (err error) { compressor.Close() pp.payload = buffer.Bytes() - pp.payload_start = pp.ack_events + pp.last_sequence = 0 + pp.sequence_len = len(pp.events) - pp.ack_events return } + +func (pp *pendingPayload) Ack(sequence int) (int, bool) { + if sequence <= pp.last_sequence { + // No change + return 0, false + } else if sequence >= pp.sequence_len { + // Full ACK + lines := pp.sequence_len - pp.last_sequence + pp.ack_events = len(pp.events) + pp.last_sequence = sequence + pp.payload = nil + return lines, true + } + + lines := sequence - pp.last_sequence + pp.ack_events += lines + pp.last_sequence = sequence + pp.payload = nil + return lines, false +} + +func (pp *pendingPayload) HasAck() bool { + return pp.ack_events != 0 +} + +func (pp *pendingPayload) Complete() bool { + return len(pp.events) == 0 +} + +func (pp *pendingPayload) Rollup() []*core.EventDescriptor { + pp.processed += pp.ack_events + rollup := pp.events[:pp.ack_events] + pp.events = pp.events[pp.ack_events:] + pp.ack_events = 0 + return rollup +} diff --git a/src/lc-lib/publisher/pending_payload_test.go b/src/lc-lib/publisher/pending_payload_test.go new file mode 100644 index 00000000..f7ddba37 --- /dev/null +++ b/src/lc-lib/publisher/pending_payload_test.go @@ -0,0 +1,159 @@ +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package publisher + +import ( + "lc-lib/core" + "time" + "testing" +) + +const ( + test_nonce = "12345678901234567890123456" +) + +func createTestPayload(t *testing.T, num_events int) *pendingPayload { + test_events := make([]*core.EventDescriptor, num_events) + for idx := range test_events { + test_events[idx] = &core.EventDescriptor{ + Stream: nil, + Offset: int64(idx), + Event: []byte(""), + } + } + + payload, err := newPendingPayload(test_events, test_nonce, time.Second) + if err != nil { + t.Log("Failed to create pending payload structure") + t.FailNow() + } + + return payload +} + +func verifyPayload(t *testing.T, payload *pendingPayload, ack bool, complete bool, num_events int, start_event int) { + if got := payload.HasAck(); got != ack { + t.Logf("Payload has ack flag wrong, got: %t, should be: %t", got, ack) + t.FailNow() + } + + if got := payload.Complete(); got != complete { + t.Logf("Payload has completed flag wrong, got: %t, should be: %t", got, complete) + t.FailNow() + } + + events := payload.Rollup() + if len(events) != num_events { + t.Logf("Payload rollup event count wrong, got: %d, should be: %d", len(events), num_events) + t.FailNow() + } + + for _, event := range events { + if event.Offset != int64(start_event) { + t.Logf("Payload rollup event offset wrong, got: %d, should be: %d", event.Offset, start_event) + t.FailNow() + } + start_event++ + } +} + +func TestPayloadNew(t *testing.T) { + payload := createTestPayload(t, 1024) + + verifyPayload(t, payload, false, false, 0, 0) +} + +func TestPayloadFullAck(t *testing.T) { + payload := createTestPayload(t, 1024) + + payload.Ack(1024) + verifyPayload(t, payload, true, false, 1024, 0) + verifyPayload(t, payload, false, true, 0, 0) +} + +func TestPayloadPartialAck(t *testing.T) { + payload := createTestPayload(t, 1024) + + t.Log("Initial partial ack") + payload.Ack(64) + verifyPayload(t, payload, true, false, 64, 0) + verifyPayload(t, payload, false, false, 0, 0) + + t.Log("Second partial ack") + payload.Ack(132) + verifyPayload(t, payload, true, false, 68, 64) + verifyPayload(t, payload, false, false, 0, 0) + + t.Log("Repeated partial ack") + payload.Ack(132) + verifyPayload(t, payload, false, false, 0, 0) + + t.Log("Double ack") + payload.Ack(148) + payload.Ack(192) + verifyPayload(t, payload, true, false, 60, 132) + + t.Log("Final ack") + payload.Ack(1024) + verifyPayload(t, payload, true, false, 832, 192) + verifyPayload(t, payload, false, true, 0, 0) +} + +func TestPayloadResend(t *testing.T) { + payload := createTestPayload(t, 1024) + + t.Log("Initial partial ack") + payload.Ack(512) + verifyPayload(t, payload, true, false, 512, 0) + verifyPayload(t, payload, false, false, 0, 0) + + payload.Generate() + + t.Log("Initial partial ack on new sequence") + payload.Ack(256) + verifyPayload(t, payload, true, false, 256, 512) + verifyPayload(t, payload, false, false, 0, 0) + t.Log("Final ack on new sequence") + payload.Ack(512) + verifyPayload(t, payload, true, false, 256, 768) + verifyPayload(t, payload, false, true, 0, 0) +} + +func TestPayloadEdgeCases(t *testing.T) { + payload := createTestPayload(t, 1024) + + t.Log("Invalid sequence < 0") + payload.Ack(-1024) + verifyPayload(t, payload, false, false, 0, 0) + + t.Log("Sequence revert - initial ack") + payload.Ack(500) + verifyPayload(t, payload, true, false, 500, 0) + verifyPayload(t, payload, false, false, 0, 0) + t.Log("Sequence revert - reverted ack") + payload.Ack(246) + verifyPayload(t, payload, false, false, 0, 0) + t.Log("Sequence revert - next ack") + payload.Ack(512) + verifyPayload(t, payload, true, false, 12, 500) + verifyPayload(t, payload, false, false, 0, 0) + + t.Log("Sequence past end") + payload.Ack(2048) + verifyPayload(t, payload, true, false, 512, 512) + verifyPayload(t, payload, false, true, 0, 0) +} diff --git a/src/lc-lib/publisher/publisher.go b/src/lc-lib/publisher/publisher.go index 96b79eaf..84f276ea 100644 --- a/src/lc-lib/publisher/publisher.go +++ b/src/lc-lib/publisher/publisher.go @@ -31,6 +31,11 @@ import ( "time" ) +var ( + ErrNetworkTimeout = errors.New("Server did not respond within network timeout") + ErrNetworkPing = errors.New("Server did not respond to PING") +) + const ( // TODO(driskell): Make the idle timeout configurable like the network timeout is? keepalive_timeout time.Duration = 900 * time.Second @@ -63,10 +68,13 @@ type Publisher struct { registrar_spool *registrar.RegistrarEventSpool shutdown bool line_count int64 + retry_count int64 seconds_no_ack int + timeout_count int64 line_speed float64 last_line_count int64 + last_retry_count int64 last_measurement time.Time } @@ -219,6 +227,8 @@ PublishLoop: // Reset timeout retry_payload.timeout = time.Now().Add(p.config.Timeout) + log.Debug("Send now open: Retrying next payload") + // Send the payload again if err = p.transport.Write("JDAT", retry_payload.payload); err != nil { break SelectLoop @@ -243,6 +253,7 @@ PublishLoop: if resent, err = p.checkResend(); err != nil { break SelectLoop } else if resent { + log.Debug("Send now open: Resent a timed out payload") // Expect an ACK within network timeout timer.Reset(p.config.Timeout) break @@ -254,9 +265,13 @@ PublishLoop: break } + log.Debug("Send now open: Awaiting events for new payload") + // Enable event wait input_toggle = p.input case events := <-input_toggle: + log.Debug("Sending new payload of %d events", len(events)) + // Send if err = p.sendNewPayload(events); err != nil { break SelectLoop @@ -268,6 +283,7 @@ PublishLoop: if p.num_payloads >= p.config.MaxPendingPayloads { // Too many pending payloads, disable send temporarily p.can_send = nil + log.Debug("Pending payload limit reached") } // Expect an ACK within network timeout if this is first payload after idle @@ -310,23 +326,27 @@ PublishLoop: } else if reload != core.Reload_None { break SelectLoop } + log.Debug("No more pending payloads, entering idle") timer.Reset(keepalive_timeout) } else { + log.Debug("%d payloads still pending, resetting timeout", p.num_payloads) timer.Reset(p.config.Timeout) } case <-timer.C: // If we have pending payloads, we should've received something by now if p.num_payloads != 0 { - err = errors.New("Server did not respond within network timeout") + err = ErrNetworkTimeout break SelectLoop } // If we haven't received a PONG yet this is a timeout if p.pending_ping { - err = errors.New("Server did not respond to PING") + err = ErrNetworkPing break SelectLoop } + log.Debug("Idle timeout: sending PING") + // Send a ping and expect a pong back (eventually) // If we receive an ACK first, that's fine we'll reset timer // But after those ACKs we should get a PONG @@ -359,7 +379,7 @@ PublishLoop: p.can_send = nil case <-stats_timer.C: - p.updateStatistics(Status_Connected) + p.updateStatistics(Status_Connected, nil) stats_timer.Reset(time.Second) } } @@ -372,7 +392,7 @@ PublishLoop: break PublishLoop } - p.updateStatistics(Status_Reconnecting) + p.updateStatistics(Status_Reconnecting, err) // An error occurred, reconnect after timeout log.Error("Transport error, will try again: %s", err) @@ -380,7 +400,7 @@ PublishLoop: } else { log.Info("Reconnecting transport") - p.updateStatistics(Status_Reconnecting) + p.updateStatistics(Status_Reconnecting, nil) } retry_payload = p.first_payload @@ -418,7 +438,7 @@ func (p *Publisher) reloadConfig(new_config *core.NetworkConfig) int { return reload } -func (p *Publisher) updateStatistics(status int) { +func (p *Publisher) updateStatistics(status int, err error) { p.Lock() p.status = status @@ -426,8 +446,13 @@ func (p *Publisher) updateStatistics(status int) { p.line_speed = core.CalculateSpeed(time.Since(p.last_measurement), p.line_speed, float64(p.line_count - p.last_line_count), &p.seconds_no_ack) p.last_line_count = p.line_count + p.last_retry_count = p.retry_count p.last_measurement = time.Now() + if err == ErrNetworkTimeout || err == ErrNetworkPing { + p.timeout_count++ + } + p.Unlock() } @@ -435,6 +460,8 @@ func (p *Publisher) checkResend() (bool, error) { // We're out of sync (received ACKs for later payloads but not earlier ones) // Check timeouts of earlier payloads and resend if necessary if payload := p.first_payload; payload.timeout.Before(time.Now()) { + p.retry_count++ + // Do we need to regenerate the payload? if payload.payload == nil { if err := payload.Generate(); err != nil { @@ -447,6 +474,7 @@ func (p *Publisher) checkResend() (bool, error) { // Requeue the payload p.first_payload = payload.next + payload.next = nil p.last_payload.next = payload p.last_payload = payload @@ -512,6 +540,8 @@ func (p *Publisher) processPong(message []byte) error { return errors.New("Unexpected PONG received") } + log.Debug("PONG message received") + p.pending_ping = false return nil } @@ -525,6 +555,8 @@ func (p *Publisher) processAck(message []byte) (err error) { // Read the nonce and sequence number acked nonce, sequence := string(message[:16]), binary.BigEndian.Uint32(message[16:20]) + log.Debug("ACKN message received for payload %x sequence %d", nonce, sequence) + // Grab the payload the ACK corresponds to by using nonce payload, found := p.pending_payloads[nonce] if !found { @@ -534,44 +566,33 @@ func (p *Publisher) processAck(message []byte) (err error) { ack_events := payload.ack_events - // Full ACK? - // TODO: Protocol error if sequence is too large? - if int(sequence) >= payload.num_events-payload.payload_start { - p.line_count += int64(payload.num_events-payload.ack_events) + // Process ACK + lines, complete := payload.Ack(int(sequence)) + p.line_count += int64(lines) - // No more events left for this payload, free the payload memory - payload.ack_events = len(payload.events) - payload.payload = nil + if complete { + // No more events left for this payload, remove from pending list delete(p.pending_payloads, nonce) - } else { - p.line_count += int64(sequence)-int64(payload.ack_events-payload.payload_start) - - // Only process the ACK if something was actually processed - if int(sequence) > payload.ack_events-payload.payload_start { - payload.ack_events = int(sequence) + payload.payload_start - // If we need to resend, we'll need to regenerate payload, so free that memory early - payload.payload = nil - } } // We potentially receive out-of-order ACKs due to payloads distributed across servers // This is where we enforce ordering again to ensure registrar receives ACK in order if payload == p.first_payload { + // The out of sync count we have will never include the first payload, so + // take the value +1 out_of_sync := p.out_of_sync + 1 - for payload.ack_events != 0 { - if payload.ack_events != len(payload.events) { - p.registrar_spool.Add(registrar.NewAckEvent(payload.events[:payload.ack_events])) - p.registrar_spool.Send() - payload.events = payload.events[payload.ack_events:] - payload.num_events = len(payload.events) - payload.ack_events = 0 - payload.payload_start = 0 + // For each full payload we mark off, we decrease this count, the first we + // mark off will always be the first payload - thus the +1. Subsequent + // payloads are the out of sync ones - so if we mark them off we decrease + // the out of sync count + for payload.HasAck() { + p.registrar_spool.Add(registrar.NewAckEvent(payload.Rollup())) + + if !payload.Complete() { break } - p.registrar_spool.Add(registrar.NewAckEvent(payload.events)) - p.registrar_spool.Send() payload = payload.next p.first_payload = payload out_of_sync-- @@ -590,8 +611,11 @@ func (p *Publisher) processAck(message []byte) (err error) { break } } + + p.registrar_spool.Send() } else if ack_events == 0 { - // Mark out of sync so we resend earlier packets in case they were lost + // If this is NOT the first payload, and this is the first acknowledgement + // for this payload, then increase out of sync payload count p.out_of_sync++ } @@ -615,6 +639,8 @@ func (p *Publisher) Snapshot() []*core.Snapshot { snapshot.AddEntry("Speed (Lps)", p.line_speed) snapshot.AddEntry("Published lines", p.last_line_count) snapshot.AddEntry("Pending Payloads", p.num_payloads) + snapshot.AddEntry("Timeouts", p.timeout_count) + snapshot.AddEntry("Retransmissions", p.last_retry_count) p.RUnlock() diff --git a/src/lc-lib/transports/tcp.go b/src/lc-lib/transports/tcp.go index 06f5f97d..fb8d9ec6 100644 --- a/src/lc-lib/transports/tcp.go +++ b/src/lc-lib/transports/tcp.go @@ -72,6 +72,12 @@ type TransportTcp struct { recv_chan chan interface{} can_send chan int + + roundrobin int + host_is_ip bool + host string + port string + addresses []net.IP } func NewTcpTransportFactory(config *core.Config, config_path string, unused map[string]interface{}, name string) (core.TransportFactory, error) { @@ -156,37 +162,59 @@ func (t *TransportTcp) Init() error { t.disconnect() } - // Pick a random server from the list. - hostport := t.net_config.Servers[rand.Int()%len(t.net_config.Servers)] - // TODO: Parse and lookup using net.ResolveTCPAddr - submatch := t.config.hostport_re.FindSubmatch([]byte(hostport)) - if submatch == nil { - return fmt.Errorf("Invalid host:port given: %s", hostport) + // Have we exhausted the address list we had? + if t.addresses == nil { + var err error + + // Round robin to the next server + selected := t.net_config.Servers[t.roundrobin%len(t.net_config.Servers)] + t.roundrobin++ + + t.host, t.port, err = net.SplitHostPort(selected) + if err != nil { + return fmt.Errorf("Invalid hostport given: %s", selected) + } + + // Are we an IP? + if ip := net.ParseIP(t.host); ip != nil { + t.host_is_ip = true + t.addresses = []net.IP{ip} + } else { + // Lookup the server in DNS + t.host_is_ip = false + t.addresses, err = net.LookupIP(t.host) + if err != nil { + return fmt.Errorf("DNS lookup failure \"%s\": %s", t.host, err) + } + } } - // Lookup the server in DNS (if this is IP it will implicitly return) - host := string(submatch[1]) - port := string(submatch[2]) - addresses, err := net.LookupHost(host) - if err != nil { - return fmt.Errorf("DNS lookup failure \"%s\": %s", host, err) + // Try next address and drop it from our list + addressport := net.JoinHostPort(t.addresses[0].String(), t.port) + if len(t.addresses) > 1 { + t.addresses = t.addresses[1:] + } else { + t.addresses = nil } - // Select a random address from the pool of addresses provided by DNS - address := addresses[rand.Int()%len(addresses)] - addressport := net.JoinHostPort(address, port) + var desc string + if t.host_is_ip { + desc = fmt.Sprintf("%s", addressport) + } else { + desc = fmt.Sprintf("%s (%s)", addressport, t.host) + } - log.Info("Attempting to connect to %s (%s)", addressport, host) + log.Info("Attempting to connect to %s", desc) tcpsocket, err := net.DialTimeout("tcp", addressport, t.net_config.Timeout) if err != nil { - return fmt.Errorf("Failed to connect to %s: %s", address, err) + return fmt.Errorf("Failed to connect to %s: %s", desc, err) } // Now wrap in TLS if this is the "tls" transport if t.config.transport == "tls" { - // Set the tlsconfig server name for server validation (since Go 1.3) - t.config.tls_config.ServerName = host + // Set the tlsconfig server name for server validation (required since Go 1.3) + t.config.tls_config.ServerName = t.host t.tlssocket = tls.Client(&transportTcpWrap{transport: t, tcpsocket: tcpsocket}, &t.config.tls_config) t.tlssocket.SetDeadline(time.Now().Add(t.net_config.Timeout)) @@ -194,7 +222,7 @@ func (t *TransportTcp) Init() error { if err != nil { t.tlssocket.Close() tcpsocket.Close() - return fmt.Errorf("TLS Handshake failure with %s: %s", address, err) + return fmt.Errorf("TLS Handshake failure with %s: %s", desc, err) } t.socket = t.tlssocket @@ -202,7 +230,7 @@ func (t *TransportTcp) Init() error { t.socket = tcpsocket } - log.Info("Connected to %s", address) + log.Info("Connected to %s", desc) // Signal channels t.shutdown = make(chan interface{}, 1) diff --git a/src/lc-lib/transports/zmq.go b/src/lc-lib/transports/zmq.go index cffbc35d..e7e971a0 100644 --- a/src/lc-lib/transports/zmq.go +++ b/src/lc-lib/transports/zmq.go @@ -44,6 +44,10 @@ const ( Monitor_Part_Extraneous ) +const ( + default_NetworkConfig_PeerSendQueue int64 = 2 +) + type TransportZmqFactory struct { transport string @@ -51,6 +55,8 @@ type TransportZmqFactory struct { CurvePublickey string `config:"curve public key"` CurveSecretkey string `config:"curve secret key"` + PeerSendQueue int64 + hostport_re *regexp.Regexp } @@ -142,10 +148,32 @@ func NewZmqTransportFactory(config *core.Config, config_path string, unused map[ if err := ret.processConfig(config_path); err != nil { return nil, err } - } else { - if err := config.ReportUnusedConfig(config_path, unused); err != nil { - return nil, err - } + + return ret, nil + } + + // Don't allow curve settings + if _, ok := unused["CurveServerkey"]; ok { + goto CheckUnused + } + if _, ok := unused["CurvePublickey"]; ok { + goto CheckUnused + } + if _, ok := unused["CurveSecretkey"]; ok { + goto CheckUnused + } + + if err = config.PopulateConfig(ret, config_path, unused); err != nil { + return nil, err + } + + if ret.PeerSendQueue == 0 { + ret.PeerSendQueue = default_NetworkConfig_PeerSendQueue + } + +CheckUnused: + if err := config.ReportUnusedConfig(config_path, unused); err != nil { + return nil, err } return ret, nil @@ -249,6 +277,11 @@ func (t *TransportZmq) Init() (err error) { return fmt.Errorf("Failed to set ZMQ linger period: %s", err) } + // Set the outbound queue + if err = t.dealer.SetSndHWM(int(t.config.PeerSendQueue)); err != nil { + return fmt.Errorf("Failed to set ZMQ send highwater: %s", err) + } + // Monitor socket if t.monitor, err = t.context.NewSocket(zmq.PULL); err != nil { return fmt.Errorf("Failed to create monitor ZMQ PULL socket: %s", err) @@ -344,18 +377,27 @@ BridgeLoop: break BridgeLoop } case message = <-t.recv_bridge_chan: - case func() chan<- interface{} { - if message != nil { - return t.recv_chan - } - return nil - }() <- message: // The reason we flush recv through the bridge and not directly to recv_chan is so that if // the poller was quick and had to cache a receive as the channel was full, it will stop // polling - flushing through bridge allows us to signal poller to start polling again // It is not the publisher's responsibility to do this, and TLS wouldn't need it bridge_in.Send([]byte(zmq_signal_input), 0) - message = nil + + // Keep trying to forward on the message + ForwardLoop: + for { + select { + case notify := <-t.bridge_chan: + bridge_in.Send(notify, 0) + + // Shutdown? + if string(notify) == zmq_signal_shutdown { + break BridgeLoop + } + case t.recv_chan <- message: + break ForwardLoop + } + } } } @@ -408,16 +450,16 @@ func (t *TransportZmq) poller(bridge_out *zmq.Socket) { } } - // Process dealer send - if t.poll_items[1].REvents&zmq.POLLOUT != 0 { - if !t.processDealerOut() { + // Process dealer receive + if t.poll_items[1].REvents&zmq.POLLIN != 0 { + if !t.processDealerIn() { break } } - // Process dealer receive - if t.poll_items[1].REvents&zmq.POLLIN != 0 { - if !t.processDealerIn() { + // Process dealer send + if t.poll_items[1].REvents&zmq.POLLOUT != 0 { + if !t.processDealerOut() { break } } @@ -436,52 +478,49 @@ func (t *TransportZmq) poller(bridge_out *zmq.Socket) { } func (t *TransportZmq) processControlIn(bridge_out *zmq.Socket) (ok bool) { - var err error + for { + RetryControl: + msg, err := bridge_out.Recv(zmq.DONTWAIT) + if err != nil { + switch err { + case syscall.EINTR: + // Try again + goto RetryControl + case syscall.EAGAIN: + // No more messages + return true + } -RetryControl: - msg, err := bridge_out.Recv(zmq.DONTWAIT) - if err != nil { - switch err { - case syscall.EINTR: - // Try again - goto RetryControl - case syscall.EAGAIN: - // No more messages - return true + // Failure + t.recv_chan <- fmt.Errorf("Pull zmq.Socket.Recv failure %s", err) + return } - // Failure - t.recv_chan <- fmt.Errorf("Pull zmq.Socket.Recv failure %s", err) - return - } - - switch string(msg) { - case zmq_signal_output: - // Start polling for send - t.poll_items[1].Events = t.poll_items[1].Events | zmq.POLLOUT - case zmq_signal_input: - // If we staged a receive, process that - if t.recv_buff != nil { - select { - case t.recv_bridge_chan <- t.recv_buff: - t.recv_buff = nil - + switch string(msg) { + case zmq_signal_output: + // Start polling for send + t.poll_items[1].Events = t.poll_items[1].Events | zmq.POLLOUT + case zmq_signal_input: + // If we staged a receive, process that + if t.recv_buff != nil { + select { + case t.recv_bridge_chan <- t.recv_buff: + t.recv_buff = nil + + // Start polling for receive + t.poll_items[1].Events = t.poll_items[1].Events | zmq.POLLIN + default: + // Do nothing, we were asked for receive but channel is already full + } + } else { // Start polling for receive t.poll_items[1].Events = t.poll_items[1].Events | zmq.POLLIN - default: - // Do nothing, we were asked for receive but channel is already full } - } else { - // Start polling for receive - t.poll_items[1].Events = t.poll_items[1].Events | zmq.POLLIN + case zmq_signal_shutdown: + // Shutdown + return } - case zmq_signal_shutdown: - // Shutdown - return } - - ok = true - return } func (t *TransportZmq) processDealerOut() (ok bool) { diff --git a/src/log-courier/log-courier.go b/src/log-courier/log-courier.go index 0ee4bcbb..c600b497 100644 --- a/src/log-courier/log-courier.go +++ b/src/log-courier/log-courier.go @@ -51,6 +51,8 @@ type LogCourier struct { config_file string from_beginning bool log_file *os.File + last_snapshot time.Time + snapshot *core.Snapshot } func NewLogCourier() *LogCourier { @@ -265,8 +267,12 @@ func (lc *LogCourier) processCommand(command string) *admin.Response { } return &admin.Response{&admin.ReloadResponse{}} case "SNAP": - snaps := lc.pipeline.Snapshot() - return &admin.Response{snaps} + if lc.snapshot == nil || time.Since(lc.last_snapshot) >= time.Second { + lc.snapshot = lc.pipeline.Snapshot() + lc.snapshot.Sort() + lc.last_snapshot = time.Now() + } + return &admin.Response{lc.snapshot} }