Skip to content

Latest commit

 

History

History
220 lines (170 loc) · 6.34 KB

README-v1.md

File metadata and controls

220 lines (170 loc) · 6.34 KB

Fluency

Coverage Status

Yet another fluentd logger.

Features

  • Better performance (4 times faster than fluent-logger-java)
  • Asynchronous / synchronous flush to Fluentd
  • TCP / UDP heartbeat with Fluentd
  • PackedForward format
  • Failover with multiple Fluentds
  • Enable /disable ack response mode

Install

Gradle

dependencies {
    compile 'org.komamitsu:fluency:1.8.1'
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency</artifactId>
    <version>1.8.1</version>
</dependency>

Usage

Create Fluency instance

For single Fluentd

// Single Fluentd(localhost:24224 by default)
//   - TCP heartbeat (by default)
//   - Asynchronous flush (by default)
//   - Without ack response (by default)
//   - Flush interval is 600ms (by default)
//   - Initial chunk buffer size is 1MB (by default)
//   - Threshold chunk buffer size to flush is 4MB (by default)
//   - Max total buffer size is 512MB (by default)
//   - Use off heap memory for buffer pool (by default)
//   - Max retry of sending events is 8 (by default)
//   - Max wait until all buffers are flushed is 10 seconds (by default)
//   - Max wait until the flusher is terminated is 10 seconds (by default)
Fluency fluency = Fluency.defaultFluency();

For multiple Fluentd with failover

// Multiple Fluentd(localhost:24224, localhost:24225)
Fluency fluency = Fluency.defaultFluency(
			Arrays.asList(new InetSocketAddress(24224), new InetSocketAddress(24225)));

Enable ACK response mode

// Single Fluentd(localhost:24224)
//   - With ack response
Fluency fluency = Fluency.defaultFluency(new Fluency.Config().setAckResponseMode(true));

Enable file backup mode

In this mode, Fluency takes backup of unsent memory buffers as files when closing and then resends them when restarting

// Single Fluentd(localhost:24224)
//   - Backup directory is the temporary directory
Fluency fluency = Fluency.defaultFluency(new Fluency.Config().setFileBackupDir(System.getProperty("java.io.tmpdir")));

Buffer configuration

// Single Fluentd(xxx.xxx.xxx.xxx:24224)
//   - Initial chunk buffer size = 4MB
//   - Threshold chunk buffer size to flush = 16MB
//     Keep this value (BufferRetentionSize) between `Initial chunk buffer size` and `Max total buffer size`
//   - Max total buffer size = 256MB
Fluency fluency = Fluency.defaultFluency("xxx.xxx.xxx.xxx", 24224,
	new Fluency.Config()
	    .setBufferChunkInitialSize(4 * 1024 * 1024)
	    .setBufferChunkRetentionSize(16 * 1024 * 1024)
	    .setMaxBufferSize(256 * 1024 * 1024L));

Waits on close sequence

// Single Fluentd(localhost:24224)
//   - Max wait until all buffers are flushed is 30 seconds
//   - Max wait until the flusher is terminated is 40 seconds
Fluency fluency = Fluency.defaultFluency(
	new Fluency.Config()
        .setWaitUntilBufferFlushed(30)
        .setWaitUntilFlusherTerminated(40));

Register Jackson modules

// Single Fluentd(localhost:24224)
//   - SimpleModule that has FooSerializer is enabled
Sender sender = new TCPSender.Config().createInstance();

SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(Foo.class, new FooSerializer());

PackedForwardBuffer.Config bufferConfig = new PackedForwardBuffer.Config()
	.setJacksonModules(Collections.<Module>singletonList(simpleModule));

Fluency fluency = new Fluency.Builder(sender).setBufferConfig(bufferConfig).build();

Set a custom error handler

Fluency fluency = Fluency.defaultFluency(
    new Fluency.Config()
        .setSenderErrorHandler(new SenderErrorHandler()
        {
            @Override
            public void handle(Throwable e)
            {
                // Send a notification
            }
        }));

	:

// If flushing events to Fluentd fails and retried out, the error handler is called back.
fluency.emit("foo.bar", event);

Send requests over SSL/TLS

// Single Fluentd(localhost:24224)
//   - Enable SSL/TLS
Fluency fluency = Fluency.defaultFluency(
	new Fluency.Config()
        .setUseSsl(true));

If you want to use a custom truststore, specify the JKS file path using -Djavax.net.ssl.trustStore (and -Djavax.net.ssl.trustStorePassword if needed). You can create a custom truststore like this:

$ keytool -import -file server.crt -alias mytruststore -keystore truststore.jks

For server side configuration, see https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls/ssl-encryption .

Other configurations

// Multiple Fluentd(localhost:24224, localhost:24225)
//   - Flush interval = 200ms
//   - Max retry of sending events = 12
//   - Use JVM heap memory for buffer pool
Fluency fluency = Fluency.defaultFluency(
			Arrays.asList(
				new InetSocketAddress(24224), new InetSocketAddress(24225)),
				new Fluency.Config().
					setFlushIntervalMillis(200).
					setSenderMaxRetryCount(12).
					setJvmHeapBufferMode(true));

Emit event

String tag = "foo_db.bar_tbl";
Map<String, Object> event = new HashMap<String, Object>();
event.put("name", "komamitsu");
event.put("age", 42);
event.put("rate", 3.14);
fluency.emit(tag, event);

If you want to use EventTime as a timestamp, call Fluency#emit with an EventTime object in the following way

int epochSeconds;
int nanoSeconds;
    :
EventTime eventTime = EventTime.fromEpoch(epochSeconds, nanoSeconds);

// You can also create an EventTime object like this
// EventTime eventTime = EventTime.fromEpochMilli(System.currentTimeMillis());

fluency.emit(tag, eventTime, event);

Release resources

fluency.close();

Know how much Fluency is allocating memory

LOG.debug("Memory size allocated by Fluency is {}", fluency.getAllocatedBufferSize());

Know how much Fluench is buffering unsent data in memory

LOG.debug("Unsent data size buffered by Fluency in memory is {}", fluency.getBufferedDataSize());