Skip to content

Commit 9678f1a

Browse files
committed
chore: Add thread priority for streaming synchronizer.
1 parent b50f684 commit 9678f1a

File tree

3 files changed

+62
-31
lines changed

3 files changed

+62
-31
lines changed

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public Synchronizer build(DataSourceBuildInputs context) {
9797
context.getBaseLogger(),
9898
context.getSelectorSource(),
9999
payloadFilter,
100-
initialReconnectDelay
100+
initialReconnectDelay,
101+
context.getThreadPriority()
101102
);
102103
}
103104
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,25 @@ class StreamingSynchronizerImpl implements Synchronizer {
6161

6262
private final AtomicBoolean started = new AtomicBoolean(false);
6363

64+
private final int threadPriority;
65+
6466
public StreamingSynchronizerImpl(
6567
HttpProperties httpProperties,
6668
URI baseUri,
6769
String requestPath,
6870
LDLogger logger,
6971
SelectorSource selectorSource,
7072
String payloadFilter,
71-
Duration initialReconnectDelaySeconds
73+
Duration initialReconnectDelaySeconds,
74+
int threadPriority
7275
) {
7376
this.httpProperties = httpProperties;
7477
this.selectorSource = selectorSource;
7578
this.logger = logger.subLogger(Loggers.STREAMING_SYNCHRONIZER);
7679
this.payloadFilter = payloadFilter;
7780
this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);
7881
this.initialReconnectDelay = initialReconnectDelaySeconds;
82+
this.threadPriority = threadPriority;
7983

8084
// The stream will lazily start when `next` is called.
8185
}
@@ -173,8 +177,7 @@ private Thread getRunThread() {
173177
}
174178
});
175179
thread.setName("LaunchDarkly-FDv2-streaming-synchronizer");
176-
// TODO: Implement thread priority.
177-
//streamThread.setPriority();
180+
thread.setPriority(threadPriority);
178181
thread.setDaemon(true);
179182
return thread;
180183
}

lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ public void receivesMultipleChangesets() throws Exception {
6767
testLogger,
6868
selectorSource,
6969
null,
70-
Duration.ofMillis(100)
70+
Duration.ofMillis(100),
71+
Thread.NORM_PRIORITY
7172
);
7273

7374
// First changeset
@@ -103,7 +104,8 @@ public void httpNonRecoverableError() throws Exception {
103104
testLogger,
104105
selectorSource,
105106
null,
106-
Duration.ofMillis(100)
107+
Duration.ofMillis(100),
108+
Thread.NORM_PRIORITY
107109
);
108110

109111
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -131,7 +133,8 @@ public void httpRecoverableError() throws Exception {
131133
testLogger,
132134
selectorSource,
133135
null,
134-
Duration.ofMillis(100)
136+
Duration.ofMillis(100),
137+
Thread.NORM_PRIORITY
135138
);
136139

137140
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -159,7 +162,8 @@ public void networkError() throws Exception {
159162
testLogger,
160163
selectorSource,
161164
null,
162-
Duration.ofMillis(100)
165+
Duration.ofMillis(100),
166+
Thread.NORM_PRIORITY
163167
);
164168

165169
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -192,7 +196,8 @@ public void invalidEventData() throws Exception {
192196
testLogger,
193197
selectorSource,
194198
null,
195-
Duration.ofMillis(100)
199+
Duration.ofMillis(100),
200+
Thread.NORM_PRIORITY
196201
);
197202

198203
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -223,7 +228,8 @@ public void shutdownBeforeEventReceived() throws Exception {
223228
testLogger,
224229
selectorSource,
225230
null,
226-
Duration.ofMillis(100)
231+
Duration.ofMillis(100),
232+
Thread.NORM_PRIORITY
227233
);
228234

229235
CompletableFuture<FDv2SourceResult> nextFuture = synchronizer.next();
@@ -262,7 +268,8 @@ public void shutdownAfterEventReceived() throws Exception {
262268
testLogger,
263269
selectorSource,
264270
null,
265-
Duration.ofMillis(100)
271+
Duration.ofMillis(100),
272+
Thread.NORM_PRIORITY
266273
);
267274

268275
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -304,7 +311,8 @@ public void goodbyeEventInResponse() throws Exception {
304311
testLogger,
305312
selectorSource,
306313
null,
307-
Duration.ofMillis(100)
314+
Duration.ofMillis(100),
315+
Thread.NORM_PRIORITY
308316
);
309317

310318
// First result should be goodbye
@@ -353,7 +361,8 @@ public void heartbeatEvent() throws Exception {
353361
testLogger,
354362
selectorSource,
355363
null,
356-
Duration.ofMillis(100)
364+
Duration.ofMillis(100),
365+
Thread.NORM_PRIORITY
357366
);
358367

359368
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -391,7 +400,8 @@ public void selectorWithVersionAndState() throws Exception {
391400
testLogger,
392401
selectorSource,
393402
null,
394-
Duration.ofMillis(100)
403+
Duration.ofMillis(100),
404+
Thread.NORM_PRIORITY
395405
);
396406

397407
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -445,7 +455,8 @@ public void selectorRefetchedOnReconnection() throws Exception {
445455
testLogger,
446456
selectorSource,
447457
null,
448-
Duration.ofMillis(100)
458+
Duration.ofMillis(100),
459+
Thread.NORM_PRIORITY
449460
);
450461

451462
// First result should be an error from the 503
@@ -505,7 +516,8 @@ public void errorEventFromServer() throws Exception {
505516
testLogger,
506517
selectorSource,
507518
null,
508-
Duration.ofMillis(100)
519+
Duration.ofMillis(100),
520+
Thread.NORM_PRIORITY
509521
);
510522

511523
// Error event should be logged but not queued, so we should get the changeset
@@ -543,7 +555,8 @@ public void selectorWithVersionOnly() throws Exception {
543555
testLogger,
544556
selectorSource,
545557
null,
546-
Duration.ofMillis(100)
558+
Duration.ofMillis(100),
559+
Thread.NORM_PRIORITY
547560
);
548561

549562
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -583,7 +596,8 @@ public void selectorWithEmptyState() throws Exception {
583596
testLogger,
584597
selectorSource,
585598
null,
586-
Duration.ofMillis(100)
599+
Duration.ofMillis(100),
600+
Thread.NORM_PRIORITY
587601
);
588602

589603
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -616,7 +630,8 @@ public void closeCalledMultipleTimes() throws Exception {
616630
testLogger,
617631
selectorSource,
618632
null,
619-
Duration.ofMillis(100)
633+
Duration.ofMillis(100),
634+
Thread.NORM_PRIORITY
620635
);
621636

622637
// Call close multiple times - should not throw exceptions
@@ -652,7 +667,8 @@ public void invalidEventStructureCausesInterrupt() throws Exception {
652667
testLogger,
653668
selectorSource,
654669
null,
655-
Duration.ofMillis(100)
670+
Duration.ofMillis(100),
671+
Thread.NORM_PRIORITY
656672
);
657673

658674
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -687,7 +703,8 @@ public void payloadFilterIsAddedToRequest() throws Exception {
687703
testLogger,
688704
selectorSource,
689705
"myFilter",
690-
Duration.ofMillis(100)
706+
Duration.ofMillis(100),
707+
Thread.NORM_PRIORITY
691708
);
692709

693710
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -728,7 +745,8 @@ public void payloadFilterWithSelectorBothAddedToRequest() throws Exception {
728745
testLogger,
729746
selectorSource,
730747
"testFilter",
731-
Duration.ofMillis(100)
748+
Duration.ofMillis(100),
749+
Thread.NORM_PRIORITY
732750
);
733751

734752
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -767,7 +785,8 @@ public void emptyPayloadFilterNotAddedToRequest() throws Exception {
767785
testLogger,
768786
selectorSource,
769787
"",
770-
Duration.ofMillis(100)
788+
Duration.ofMillis(100),
789+
Thread.NORM_PRIORITY
771790
);
772791

773792
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -806,7 +825,8 @@ public void nullPayloadFilterNotAddedToRequest() throws Exception {
806825
testLogger,
807826
selectorSource,
808827
null,
809-
Duration.ofMillis(100)
828+
Duration.ofMillis(100),
829+
Thread.NORM_PRIORITY
810830
);
811831

812832
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -846,7 +866,8 @@ public void fdv1FallbackFlagSetToTrueInSuccessResponse() throws Exception {
846866
testLogger,
847867
selectorSource,
848868
null,
849-
Duration.ofMillis(100)
869+
Duration.ofMillis(100),
870+
Thread.NORM_PRIORITY
850871
);
851872

852873
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -881,7 +902,8 @@ public void fdv1FallbackFlagSetToFalseWhenHeaderNotPresent() throws Exception {
881902
testLogger,
882903
selectorSource,
883904
null,
884-
Duration.ofMillis(100)
905+
Duration.ofMillis(100),
906+
Thread.NORM_PRIORITY
885907
);
886908

887909
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -911,7 +933,8 @@ public void fdv1FallbackFlagSetToTrueInErrorResponse() throws Exception {
911933
testLogger,
912934
selectorSource,
913935
null,
914-
Duration.ofMillis(100)
936+
Duration.ofMillis(100),
937+
Thread.NORM_PRIORITY
915938
);
916939

917940
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -948,7 +971,8 @@ public void environmentIdExtractedFromHeaders() throws Exception {
948971
testLogger,
949972
selectorSource,
950973
null,
951-
Duration.ofMillis(100)
974+
Duration.ofMillis(100),
975+
Thread.NORM_PRIORITY
952976
);
953977

954978
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -986,7 +1010,8 @@ public void bothFdv1FallbackAndEnvironmentIdExtractedFromHeaders() throws Except
9861010
testLogger,
9871011
selectorSource,
9881012
null,
989-
Duration.ofMillis(100)
1013+
Duration.ofMillis(100),
1014+
Thread.NORM_PRIORITY
9901015
);
9911016

9921017
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -1023,7 +1048,8 @@ public void serializationExceptionWithoutFallbackHeader() throws Exception {
10231048
testLogger,
10241049
selectorSource,
10251050
null,
1026-
Duration.ofMillis(100)
1051+
Duration.ofMillis(100),
1052+
Thread.NORM_PRIORITY
10271053
);
10281054

10291055
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();
@@ -1061,7 +1087,8 @@ public void serializationExceptionPreservesFallbackHeader() throws Exception {
10611087
testLogger,
10621088
selectorSource,
10631089
null,
1064-
Duration.ofMillis(100)
1090+
Duration.ofMillis(100),
1091+
Thread.NORM_PRIORITY
10651092
);
10661093

10671094
CompletableFuture<FDv2SourceResult> resultFuture = synchronizer.next();

0 commit comments

Comments
 (0)