@@ -52,6 +52,41 @@ public class TestDataV2Test {
5252
5353 private final CapturingDataSourceUpdates updates = new CapturingDataSourceUpdates ();
5454
55+ /**
56+ * Helper class that consumes FDv2SourceResult objects in a background thread.
57+ * This is necessary because update() and delete() block until results are closed,
58+ * so we can't call sync.next() on the same thread.
59+ */
60+ private static class ResultConsumer {
61+ private final BlockingQueue <FDv2SourceResult > results = new LinkedBlockingQueue <>();
62+ private final Thread consumerThread ;
63+ private volatile boolean stopped = false ;
64+
65+ ResultConsumer (Synchronizer sync ) {
66+ consumerThread = new Thread (() -> {
67+ while (!stopped ) {
68+ try {
69+ FDv2SourceResult result = sync .next ().get (5 , TimeUnit .SECONDS );
70+ result .close (); // Close immediately to unblock put()
71+ results .put (result );
72+ } catch (Exception e ) {
73+ break ;
74+ }
75+ }
76+ });
77+ consumerThread .start ();
78+ }
79+
80+ FDv2SourceResult next () throws Exception {
81+ return results .poll (5 , TimeUnit .SECONDS );
82+ }
83+
84+ void stop () {
85+ stopped = true ;
86+ consumerThread .interrupt ();
87+ }
88+ }
89+
5590 private DataSourceBuildInputs dataSourceBuildInputs () {
5691 ClientContext context = clientContext ("" , new LDConfig .Builder ().build (), updates );
5792 SelectorSource selectorSource = () -> Selector .EMPTY ;
@@ -70,8 +105,9 @@ private DataSourceBuildInputs dataSourceBuildInputs() {
70105 public void initializesWithEmptyData () throws Exception {
71106 TestDataV2 td = TestDataV2 .synchronizer ();
72107 Synchronizer sync = td .build (dataSourceBuildInputs ());
108+ ResultConsumer consumer = new ResultConsumer (sync );
73109
74- FDv2SourceResult result = sync .next (). get ( 5 , TimeUnit . SECONDS );
110+ FDv2SourceResult result = consumer .next ();
75111
76112 assertThat (result .getResultType (), equalTo (FDv2SourceResult .ResultType .CHANGE_SET ));
77113 ChangeSet <ItemDescriptor > changeSet = result .getChangeSet ();
@@ -80,6 +116,8 @@ public void initializesWithEmptyData() throws Exception {
80116 assertThat (changeSet .getData (), iterableWithSize (1 ));
81117 assertThat (get (changeSet .getData (), 0 ).getKey (), equalTo (DataModel .FEATURES ));
82118 assertThat (get (changeSet .getData (), 0 ).getValue ().getItems (), emptyIterable ());
119+
120+ consumer .stop ();
83121 }
84122
85123 @ Test
@@ -89,7 +127,9 @@ public void initializesWithFlags() throws Exception {
89127 .update (td .flag ("flag2" ).on (false ));
90128
91129 Synchronizer sync = td .build (dataSourceBuildInputs ());
92- FDv2SourceResult result = sync .next ().get (5 , TimeUnit .SECONDS );
130+ ResultConsumer consumer = new ResultConsumer (sync );
131+
132+ FDv2SourceResult result = consumer .next ();
93133
94134 assertThat (result .getResultType (), equalTo (FDv2SourceResult .ResultType .CHANGE_SET ));
95135 ChangeSet <ItemDescriptor > changeSet = result .getChangeSet ();
@@ -113,20 +153,23 @@ public void initializesWithFlags() throws Exception {
113153
114154 assertJsonEquals (flagJson (expectedFlag1 , 1 ), flagJson (flag1 ));
115155 assertJsonEquals (flagJson (expectedFlag2 , 1 ), flagJson (flag2 ));
156+
157+ consumer .stop ();
116158 }
117159
118160 @ Test
119161 public void addsFlag () throws Exception {
120162 TestDataV2 td = TestDataV2 .synchronizer ();
121163 Synchronizer sync = td .build (dataSourceBuildInputs ());
164+ ResultConsumer consumer = new ResultConsumer (sync );
122165
123- FDv2SourceResult initResult = sync .next (). get ( 5 , TimeUnit . SECONDS );
166+ FDv2SourceResult initResult = consumer .next ();
124167 assertThat (initResult .getResultType (), equalTo (FDv2SourceResult .ResultType .CHANGE_SET ));
125168 assertThat (initResult .getChangeSet ().getType (), equalTo (ChangeSetType .Full ));
126169
127170 td .update (td .flag ("flag1" ).on (true ));
128171
129- FDv2SourceResult updateResult = sync .next (). get ( 5 , TimeUnit . SECONDS );
172+ FDv2SourceResult updateResult = consumer .next ();
130173 assertThat (updateResult .getResultType (), equalTo (FDv2SourceResult .ResultType .CHANGE_SET ));
131174 ChangeSet <ItemDescriptor > changeSet = updateResult .getChangeSet ();
132175 assertThat (changeSet .getType (), equalTo (ChangeSetType .Partial ));
@@ -141,6 +184,8 @@ public void addsFlag() throws Exception {
141184 ModelBuilders .FlagBuilder expectedFlag = flagBuilder ("flag1" ).version (1 ).salt ("" )
142185 .on (true ).offVariation (1 ).fallthroughVariation (0 ).variations (true , false );
143186 assertJsonEquals (flagJson (expectedFlag , 1 ), flagJson (flag1 ));
187+
188+ consumer .stop ();
144189 }
145190
146191 @ Test
@@ -152,12 +197,14 @@ public void updatesFlag() throws Exception {
152197 .ifMatch ("name" , LDValue .of ("Lucy" )).thenReturn (true ));
153198
154199 Synchronizer sync = td .build (dataSourceBuildInputs ());
155- FDv2SourceResult initResult = sync .next ().get (5 , TimeUnit .SECONDS );
200+ ResultConsumer consumer = new ResultConsumer (sync );
201+
202+ FDv2SourceResult initResult = consumer .next ();
156203 assertThat (initResult .getResultType (), equalTo (FDv2SourceResult .ResultType .CHANGE_SET ));
157204
158205 td .update (td .flag ("flag1" ).on (true ));
159206
160- FDv2SourceResult updateResult = sync .next (). get ( 5 , TimeUnit . SECONDS );
207+ FDv2SourceResult updateResult = consumer .next ();
161208 ChangeSet <ItemDescriptor > changeSet = updateResult .getChangeSet ();
162209 Map <String , ItemDescriptor > items = ImmutableMap .copyOf (get (changeSet .getData (), 0 ).getValue ().getItems ());
163210 ItemDescriptor flag1 = items .get ("flag1" );
@@ -168,29 +215,33 @@ public void updatesFlag() throws Exception {
168215 .addTarget (0 , "a" ).addContextTarget (ContextKind .DEFAULT , 0 )
169216 .addRule ("rule0" , 0 , "{\" contextKind\" :\" user\" ,\" attribute\" :\" name\" ,\" op\" :\" in\" ,\" values\" :[\" Lucy\" ]}" );
170217 assertJsonEquals (flagJson (expectedFlag , 2 ), flagJson (flag1 ));
218+
219+ consumer .stop ();
171220 }
172221
173222 @ Test
174223 public void deletesFlag () throws Exception {
175224 TestDataV2 td = TestDataV2 .synchronizer ();
176225 Synchronizer sync = td .build (dataSourceBuildInputs ());
226+ ResultConsumer consumer = new ResultConsumer (sync );
177227
178- sync .next (). get ( 5 , TimeUnit . SECONDS );
228+ consumer .next (); // Consume initial result
179229
180230 td .update (td .flag ("foo" ).on (false ).valueForAll (LDValue .of ("bar" )));
181- FDv2SourceResult addResult = sync .next (). get ( 5 , TimeUnit . SECONDS );
231+ FDv2SourceResult addResult = consumer .next ();
182232 assertThat (addResult .getChangeSet ().getType (), equalTo (ChangeSetType .Partial ));
183233 Map <String , ItemDescriptor > addItems = ImmutableMap .copyOf (get (addResult .getChangeSet ().getData (), 0 ).getValue ().getItems ());
184234 assertThat (addItems .get ("foo" ).getVersion (), equalTo (1 ));
185235 assertThat (addItems .get ("foo" ).getItem (), notNullValue ());
186236
187237 td .delete ("foo" );
188- FDv2SourceResult deleteResult = sync .next (). get ( 5 , TimeUnit . SECONDS );
238+ FDv2SourceResult deleteResult = consumer .next ();
189239 assertThat (deleteResult .getChangeSet ().getType (), equalTo (ChangeSetType .Partial ));
190240 Map <String , ItemDescriptor > deleteItems = ImmutableMap .copyOf (get (deleteResult .getChangeSet ().getData (), 0 ).getValue ().getItems ());
191241 assertThat (deleteItems .get ("foo" ).getVersion (), equalTo (2 ));
192242 assertThat (deleteItems .get ("foo" ).getItem (), nullValue ());
193243
244+ consumer .stop ();
194245 sync .close ();
195246 }
196247
@@ -248,17 +299,21 @@ private void verifyFlag(
248299
249300 TestDataV2 td = TestDataV2 .synchronizer ();
250301 Synchronizer sync = td .build (dataSourceBuildInputs ());
251- sync .next ().get (5 , TimeUnit .SECONDS );
302+ ResultConsumer consumer = new ResultConsumer (sync );
303+
304+ consumer .next (); // Consume initial result
252305
253306 td .update (configureFlag .apply (td .flag ("flagkey" )));
254307
255- FDv2SourceResult result = sync .next (). get ( 5 , TimeUnit . SECONDS );
308+ FDv2SourceResult result = consumer .next ();
256309 assertThat (result .getResultType (), equalTo (FDv2SourceResult .ResultType .CHANGE_SET ));
257310 ChangeSet <ItemDescriptor > changeSet = result .getChangeSet ();
258311 Map <String , ItemDescriptor > items = ImmutableMap .copyOf (get (changeSet .getData (), 0 ).getValue ().getItems ());
259312 ItemDescriptor flag = items .get ("flagkey" );
260313 assertThat (flag .getVersion (), equalTo (1 ));
261314 assertJsonEquals (flagJson (expectedFlag , 1 ), flagJson (flag ));
315+
316+ consumer .stop ();
262317 }
263318
264319 private static String flagJson (ModelBuilders .FlagBuilder flagBuilder , int version ) {
0 commit comments