Skip to content
This repository was archived by the owner on Dec 19, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions suro-core/src/main/java/com/netflix/suro/TagKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public class TagKey {
public static final String RETRIED_COUNT = "retriedCount";
public static final String ROUTING_KEY = "routingKey";
public static final String REJECTED_REASON = "rejectedReason";
public static final String INPUT = "suro.input";
}
2 changes: 2 additions & 0 deletions suro-core/src/main/java/com/netflix/suro/input/SuroInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public interface SuroInput {
void shutdown();

void setPause(long ms);

String getStat();
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public void setPause(long ms) {
pausedTime.addAndGet(ms);
}

@Override
public String getStat() {
return "n/a";
}

private void stop() {
running = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ public void setPause(long ms) {
pausedTime.addAndGet(ms);
}

@Override
public String getStat() {
return "n/a";
}

public static TypeReference<Map<String, Object>> typeReference = new TypeReference<Map<String, Object>>() {};
private static final int retryCount = 5;
private static final int sleepOnS3Exception = 5000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ public void shutdown() {
input.shutdown();
}
}

public String reportInputStat() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, SuroInput> entry : inputMap.entrySet()) {
sb.append(entry.getKey()).append(':').append(entry.getValue().getStat()).append("\n\n");
}

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.netflix.suro.input.thrift;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.netflix.governator.guice.lazy.LazySingleton;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.DynamicCounter;
import com.netflix.servo.monitor.BasicCounter;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import com.netflix.suro.ClientConfig;
Expand All @@ -33,6 +35,7 @@
import com.netflix.suro.message.MessageSetReader;
import com.netflix.suro.queue.Queue4Server;
import com.netflix.suro.routing.MessageRouter;
import com.netflix.suro.servo.Servo;
import com.netflix.suro.thrift.*;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -127,9 +130,12 @@ public Result process(TMessageSet messageSet) throws TException {
try {
// Stop adding chunks if it's no running
if ( !isRunning) {
DynamicCounter.increment(rejectedMessageCountMetrics,
TagKey.APP, messageSet.getApp(),
TagKey.REJECTED_REASON, "SURO_STOPPED");
Servo.getCounter(
MonitorConfig.builder(rejectedMessageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.withTag(TagKey.REJECTED_REASON, "SURO_STOPPED")
.build()).increment();

log.warn("Message processor is not running. Message rejected");
result.setMessage("Suro server stopped");
Expand All @@ -138,9 +144,12 @@ public Result process(TMessageSet messageSet) throws TException {
}

if ( !isTakingTraffic ) {
DynamicCounter.increment(rejectedMessageCountMetrics,
TagKey.APP, messageSet.getApp(),
TagKey.REJECTED_REASON, "SURO_THROTTLING");
Servo.getCounter(
MonitorConfig.builder(rejectedMessageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.withTag(TagKey.REJECTED_REASON, "SURO_THROTTLING")
.build()).increment();

log.warn("Suro is not taking traffic. Message rejected. ");
result.setMessage("Suro server is not taking traffic");
Expand All @@ -150,24 +159,32 @@ public Result process(TMessageSet messageSet) throws TException {

MessageSetReader reader = new MessageSetReader(messageSet);
if (!reader.checkCRC()) {
DynamicCounter.increment(dataCorruptionCountMetrics, TagKey.APP, messageSet.getApp());
Servo.getCounter(
MonitorConfig.builder(dataCorruptionCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

result.setMessage("data corrupted");
result.setResultCode(ResultCode.CRC_CORRUPTED);
return result;
}

if (queue.offer(messageSet)) {
DynamicCounter.increment(
MonitorConfig.builder(messageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.build(),
messageSet.getNumMessages());
Servo.getCounter(
MonitorConfig.builder(messageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

result.setMessage(Long.toString(messageSet.getCrc()));
result.setResultCode(ResultCode.OK);
} else {
DynamicCounter.increment(retryCountMetrics, TagKey.APP, messageSet.getApp());
Servo.getCounter(
MonitorConfig.builder(retryCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

result.setMessage(Long.toString(messageSet.getCrc()));
result.setResultCode(ResultCode.QUEUE_FULL);
Expand Down Expand Up @@ -234,9 +251,12 @@ private void processMessageSet(TMessageSet tMessageSet) {
try {
router.process(input, new DefaultMessageContainer(message, jsonMapper));
} catch (Exception e) {
DynamicCounter.increment(messageProcessErrorMetrics,
TagKey.APP, tMessageSet.getApp(),
TagKey.DATA_SOURCE, message.getRoutingKey());
Servo.getCounter(
MonitorConfig.builder(messageProcessErrorMetrics)
.withTag(TagKey.APP, tMessageSet.getApp())
.withTag(TagKey.DATA_SOURCE, message.getRoutingKey())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

log.error(String.format("Failed to process message %s: %s", message, e.getMessage()), e);
}
Expand Down Expand Up @@ -276,4 +296,74 @@ public TMessageSet poll(long timeout, TimeUnit unit) {
public void setInput(SuroInput input) {
this.input = input;
}

public String getStat() {
StringBuilder sb = new StringBuilder();

StringBuilder messageCount = new StringBuilder();
StringBuilder retryCount = new StringBuilder();
StringBuilder dataCorruptionCount = new StringBuilder();
StringBuilder rejectedMessageCount = new StringBuilder();
StringBuilder messageProcessError = new StringBuilder();

/*
private static final String messageCountMetrics = "messageCount";
private static final String retryCountMetrics = "retryCount";
private static final String dataCorruptionCountMetrics = "corruptedMessageCount";
private static final String rejectedMessageCountMetrics = "rejectedMessageCount";
private static final String messageProcessErrorMetrics = "processErrorCount";
*/

for (com.netflix.servo.monitor.Monitor<?> m : DefaultMonitorRegistry.getInstance().getRegisteredMonitors()) {
log.debug("Got monitor of type {}", m);

if(m instanceof BasicCounter) {
BasicCounter counter = (BasicCounter)m;
String inputId = counter.getConfig().getTags().getValue(TagKey.INPUT);
if(!Strings.isNullOrEmpty(inputId) && inputId.equals(input.getId())){
if (counter.getConfig().getName().equals(messageCountMetrics)) {
messageCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(retryCountMetrics)) {
retryCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(dataCorruptionCountMetrics)) {
dataCorruptionCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(rejectedMessageCountMetrics)) {
rejectedMessageCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getConfig().getTags().getValue(TagKey.REJECTED_REASON))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(messageProcessErrorMetrics)) {
messageProcessError
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getConfig().getTags().getValue(TagKey.DATA_SOURCE))
.append(":")
.append(counter.getValue()).append('\n');
}
}
}
}

sb.append('\n').append(messageCountMetrics).append('\n').append(messageCount.toString());
sb.append('\n').append(retryCountMetrics).append('\n').append(retryCount.toString());
sb.append('\n').append(dataCorruptionCountMetrics).append('\n').append(dataCorruptionCount.toString());
sb.append('\n').append(rejectedMessageCountMetrics).append('\n').append(rejectedMessageCount.toString());
sb.append('\n').append(messageProcessErrorMetrics).append('\n').append(messageProcessError.toString());
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public void run() {
}
}

@Override
public String getStat() {
return msgProcessor.getStat();
}

// for testing purpose
public int getPort() {
return port;
Expand Down
29 changes: 29 additions & 0 deletions suro-server/src/main/java/com/netflix/suro/server/InputStat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.netflix.suro.server;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.suro.input.InputManager;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

/**
* Created by adamschmidt on 18/02/15.
*/
@Singleton
@Path("/suroinputstat")
public class InputStat {
private final InputManager inputManager;

@Inject
public InputStat(InputManager inputManager) {
this.inputManager = inputManager;
}

@GET
@Produces("text/plain")
public String get() {
return inputManager.reportInputStat();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static ServletModule createJerseyServletModule() {
protected void configureServlets() {
bind(HealthCheck.class);
bind(SinkStat.class);
bind(InputStat.class);
bind(GuiceContainer.class).asEagerSingleton();
serve("/*").with(GuiceContainer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public void shutdown() {
public void setPause(long ms) {

}

@Override
public String getStat() {
return "n/a";
}
}

@Test
Expand Down