Skip to content

Commit

Permalink
make writer wait configurable
Browse files Browse the repository at this point in the history
Adds the config property `parfait.writer.wait` that can be used to
adjust the amount of time an update metric operation will wait for the
writer to start.
  • Loading branch information
pwinckles committed Nov 20, 2024
1 parent b4e6791 commit 5489587
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
13 changes: 11 additions & 2 deletions dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void putBytes(ByteBuffer buffer, String value) {
private volatile State state = State.STOPPED;
private final Monitor stateMonitor = new Monitor();
private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED);
private volatile Duration maxWaitStart = Duration.ofSeconds(10);
private volatile boolean usePerMetricLock = true;
private final Map<PcpValueInfo,ByteBuffer> perMetricByteBuffers = newConcurrentMap();
private final Object globalLock = new Object();
Expand Down Expand Up @@ -231,7 +232,6 @@ public PcpMmvWriter(ByteBufferFactory byteBufferFactory, IdentifierSourceSet ide
this.instanceDomainStore = mmvVersion.createInstanceDomainStore(identifierSources, stringStore);
this.mmvVersion = mmvVersion;
this.metricNameValidator = mmvVersion.createMetricNameValidator();

registerType(String.class, MMV_STRING_HANDLER);
}

Expand Down Expand Up @@ -324,7 +324,7 @@ public final void updateMetric(MetricName name, Object value) {
if (state == State.STARTED) {
doUpdateMetric(name, value);
} else if (state == State.STARTING) {
if (stateMonitor.enterWhenUninterruptibly(isStarted, Duration.ofSeconds(10))) {
if (stateMonitor.enterWhenUninterruptibly(isStarted, maxWaitStart)) {
// Leave the monitor immediately because we only care about being notified about the state change
stateMonitor.leave();
doUpdateMetric(name, value);
Expand Down Expand Up @@ -422,6 +422,15 @@ public void setFlags(Set<MmvFlag> flags) {
this.flags = EnumSet.copyOf(flags);
}

/**
* Sets the maximum amount of time to wait for the writer to start when attempting to update a metric.
*
* @param maxWaitStart the maximum amount of time to wait
*/
public void setMaxWaitStart(Duration maxWaitStart) {
this.maxWaitStart = Preconditions.checkNotNull(maxWaitStart, "maxWaitStart cannot be null");
}

private synchronized void addMetricInfo(MetricName name, Semantics semantics, Unit<?> unit,
Object initialValue, TypeHandler<?> pcpType) {
if (metricData.containsKey(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.pcp.parfait.ValueSemantics;

import java.io.IOException;
import java.time.Duration;
import java.util.EnumSet;

import javax.management.AttributeNotFoundException;
Expand Down Expand Up @@ -73,6 +74,7 @@ public void start() {
writer = new PcpMmvWriter(name, IdentifierSourceSet.DEFAULT_SET);
writer.setClusterIdentifier(MonitoringViewProperties.getCluster());
writer.setFlags(EnumSet.of(PcpMmvWriter.MmvFlag.MMV_FLAG_PROCESS));
writer.setMaxWaitStart(Duration.ofMillis(MonitoringViewProperties.getWriterWait()));

DynamicMonitoringView view;
view = new DynamicMonitoringView(registry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.lang.management.ManagementFactory;
import java.util.Collections;

import io.pcp.parfait.DynamicMonitoringView;
import io.pcp.parfait.dxm.HashingIdentifierSource;
import io.pcp.parfait.dxm.IdentifierSource;

Expand All @@ -31,15 +30,18 @@ public class MonitoringViewProperties {
private static final String INTERVAL = "interval";
private static final String STARTUP = "startup";
private static final String CONNECT = "connect";
private static final String WRITER_WAIT = "writer.wait";

public static final String PARFAIT_NAME = PARFAIT + "." + NAME;
public static final String PARFAIT_CLUSTER = PARFAIT + "." + CLUSTER;
public static final String PARFAIT_INTERVAL = PARFAIT + "." + INTERVAL;
public static final String PARFAIT_STARTUP = PARFAIT + "." + STARTUP;
public static final String PARFAIT_CONNECT = PARFAIT + "." + CONNECT;
public static final String PARFAIT_WRITER_WAIT = PARFAIT + "." + WRITER_WAIT;

private static final String DEFAULT_INTERVAL = "1000"; // milliseconds
private static final String DEFAULT_CONNECT = "localhost:9875";
private static final String DEFAULT_WRITER_WAIT_MS = "10000";

public static String getCommandBasename(String command) {
// trim away arguments, produce a generally sanitized basename
Expand Down Expand Up @@ -139,6 +141,14 @@ public static String getDefaultConnection() {
return connect;
}

public static String getDefaultWriterWait() {
String writerWait = System.getProperty(PARFAIT_WRITER_WAIT);
if (writerWait == null || writerWait.isEmpty()) {
return DEFAULT_WRITER_WAIT_MS;
}
return writerWait;
}

public static void setupProperties() {
String name = getDefaultName(getParfaitName(), getDefaultCommand(), getRuntimeName());
System.setProperty(PARFAIT_NAME, name);
Expand All @@ -154,6 +164,9 @@ public static void setupProperties() {

String connect = getDefaultConnection();
System.setProperty(PARFAIT_CONNECT, connect);

String writerWait = getDefaultWriterWait();
System.setProperty(PARFAIT_WRITER_WAIT, writerWait);
}

//
Expand All @@ -174,4 +187,13 @@ public static Long getStartup() {
public static String getConnection() {
return System.getProperty(PARFAIT_CONNECT);
}

/**
* The maximum number of milliseconds to wait for PcpMmvWriter to start when attempting to update a metric.
*
* @return maximum number of milliseconds to wait
*/
public static long getWriterWait() {
return Long.parseLong(System.getProperty(PARFAIT_WRITER_WAIT));
}
}

0 comments on commit 5489587

Please sign in to comment.