diff --git a/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java b/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java index bfc15184..dbbd2a8f 100644 --- a/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java +++ b/dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java @@ -167,6 +167,7 @@ public void putBytes(ByteBuffer buffer, String value) { private volatile State state = State.STOPPED; private final Monitor stateMonitor = new Monitor(); private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED); + private volatile Duration maxWaitStart = Duration.ofSeconds(10); private volatile boolean usePerMetricLock = true; private final Map perMetricByteBuffers = newConcurrentMap(); private final Object globalLock = new Object(); @@ -231,7 +232,6 @@ public PcpMmvWriter(ByteBufferFactory byteBufferFactory, IdentifierSourceSet ide this.instanceDomainStore = mmvVersion.createInstanceDomainStore(identifierSources, stringStore); this.mmvVersion = mmvVersion; this.metricNameValidator = mmvVersion.createMetricNameValidator(); - registerType(String.class, MMV_STRING_HANDLER); } @@ -324,7 +324,7 @@ public final void updateMetric(MetricName name, Object value) { if (state == State.STARTED) { doUpdateMetric(name, value); } else if (state == State.STARTING) { - if (stateMonitor.enterWhenUninterruptibly(isStarted, Duration.ofSeconds(10))) { + if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) { // Leave the monitor immediately because we only care about being notified about the state change stateMonitor.leave(); doUpdateMetric(name, value); @@ -422,6 +422,15 @@ public void setFlags(Set flags) { this.flags = EnumSet.copyOf(flags); } + /** + * Sets the maximum amount of time to wait for the writer to start when attempting to update a metric. + * + * @param maxWaitStart the maximum amount of time to wait + */ + public void setMaxWaitStart(Duration maxWaitStart) { + this.maxWaitStart = Preconditions.checkNotNull(maxWaitStart, "maxWaitStart cannot be null"); + } + private synchronized void addMetricInfo(MetricName name, Semantics semantics, Unit unit, Object initialValue, TypeHandler pcpType) { if (metricData.containsKey(name)) { diff --git a/parfait-agent/src/main/java/io/pcp/parfait/AgentMonitoringView.java b/parfait-agent/src/main/java/io/pcp/parfait/AgentMonitoringView.java index a6cb7a34..5312a4ce 100644 --- a/parfait-agent/src/main/java/io/pcp/parfait/AgentMonitoringView.java +++ b/parfait-agent/src/main/java/io/pcp/parfait/AgentMonitoringView.java @@ -33,6 +33,7 @@ import io.pcp.parfait.ValueSemantics; import java.io.IOException; +import java.time.Duration; import java.util.EnumSet; import javax.management.AttributeNotFoundException; @@ -73,6 +74,7 @@ public void start() { writer = new PcpMmvWriter(name, IdentifierSourceSet.DEFAULT_SET); writer.setClusterIdentifier(MonitoringViewProperties.getCluster()); writer.setFlags(EnumSet.of(PcpMmvWriter.MmvFlag.MMV_FLAG_PROCESS)); + writer.setMaxWaitStart(Duration.ofMillis(MonitoringViewProperties.getWriterWait())); DynamicMonitoringView view; view = new DynamicMonitoringView(registry, diff --git a/parfait-agent/src/main/java/io/pcp/parfait/MonitoringViewProperties.java b/parfait-agent/src/main/java/io/pcp/parfait/MonitoringViewProperties.java index a5532514..13f5dce6 100644 --- a/parfait-agent/src/main/java/io/pcp/parfait/MonitoringViewProperties.java +++ b/parfait-agent/src/main/java/io/pcp/parfait/MonitoringViewProperties.java @@ -19,7 +19,6 @@ import java.lang.management.ManagementFactory; import java.util.Collections; -import io.pcp.parfait.DynamicMonitoringView; import io.pcp.parfait.dxm.HashingIdentifierSource; import io.pcp.parfait.dxm.IdentifierSource; @@ -31,15 +30,18 @@ public class MonitoringViewProperties { private static final String INTERVAL = "interval"; private static final String STARTUP = "startup"; private static final String CONNECT = "connect"; + private static final String WRITER_WAIT = "writer.wait"; public static final String PARFAIT_NAME = PARFAIT + "." + NAME; public static final String PARFAIT_CLUSTER = PARFAIT + "." + CLUSTER; public static final String PARFAIT_INTERVAL = PARFAIT + "." + INTERVAL; public static final String PARFAIT_STARTUP = PARFAIT + "." + STARTUP; public static final String PARFAIT_CONNECT = PARFAIT + "." + CONNECT; + public static final String PARFAIT_WRITER_WAIT = PARFAIT + "." + WRITER_WAIT; private static final String DEFAULT_INTERVAL = "1000"; // milliseconds private static final String DEFAULT_CONNECT = "localhost:9875"; + private static final String DEFAULT_WRITER_WAIT_MS = "10000"; public static String getCommandBasename(String command) { // trim away arguments, produce a generally sanitized basename @@ -139,6 +141,14 @@ public static String getDefaultConnection() { return connect; } + public static String getDefaultWriterWait() { + String writerWait = System.getProperty(PARFAIT_WRITER_WAIT); + if (writerWait == null || writerWait.isEmpty()) { + return DEFAULT_WRITER_WAIT_MS; + } + return writerWait; + } + public static void setupProperties() { String name = getDefaultName(getParfaitName(), getDefaultCommand(), getRuntimeName()); System.setProperty(PARFAIT_NAME, name); @@ -154,6 +164,9 @@ public static void setupProperties() { String connect = getDefaultConnection(); System.setProperty(PARFAIT_CONNECT, connect); + + String writerWait = getDefaultWriterWait(); + System.setProperty(PARFAIT_WRITER_WAIT, writerWait); } // @@ -174,4 +187,13 @@ public static Long getStartup() { public static String getConnection() { return System.getProperty(PARFAIT_CONNECT); } + + /** + * The maximum number of milliseconds to wait for PcpMmvWriter to start when attempting to update a metric. + * + * @return maximum number of milliseconds to wait + */ + public static long getWriterWait() { + return Long.parseLong(System.getProperty(PARFAIT_WRITER_WAIT)); + } }