2929import  org .apache .cassandra .db .ColumnFamilyStore ;
3030import  org .apache .cassandra .distributed .shared .ClusterUtils ;
3131import  org .apache .cassandra .utils .concurrent .Condition ;
32+ 
3233import  org .junit .AfterClass ;
3334import  org .junit .Assert ;
34- import  org .junit .BeforeClass ;
3535import  org .junit .Test ;
36+ import  org .junit .runner .RunWith ;
37+ import  org .junit .runners .Parameterized ;
3638
3739import  org .apache .cassandra .distributed .Cluster ;
3840import  org .apache .cassandra .distributed .api .ICluster ;
3941import  org .apache .cassandra .distributed .api .IInstanceConfig ;
4042import  org .apache .cassandra .distributed .api .IInvokableInstance ;
43+ import  org .apache .cassandra .schema .SchemaConstants ;
44+ import  org .apache .cassandra .schema .SystemDistributedKeyspace ;
4145import  org .apache .cassandra .service .StorageService ;
4246
4347import  static  com .google .common .collect .ImmutableList .of ;
48+ 
4449import  static  java .util .concurrent .TimeUnit .MINUTES ;
50+ 
4551import  static  org .apache .cassandra .distributed .api .Feature .GOSSIP ;
4652import  static  org .apache .cassandra .distributed .api .Feature .NETWORK ;
4753import  static  org .apache .cassandra .distributed .shared .AssertUtils .assertRows ;
5056import  static  org .apache .cassandra .utils .concurrent .Condition .newOneTimeCondition ;
5157import  static  org .apache .cassandra .utils .progress .ProgressEventType .COMPLETE ;
5258
59+ @ RunWith (Parameterized .class )
5360public  class  RepairTest  extends  TestBaseImpl 
5461{
62+     private  static  boolean  nodesHaveCDC ;
63+     private  static  boolean  tableHasCDC ;
5564    private  static  ICluster <IInvokableInstance > cluster ;
5665
66+     @ Parameterized .Parameters (name  = "nodesHaveCDC={0}, tableHasCDC={1}" )
67+     public  static  Iterable <Object []> data ()
68+     {
69+         return  Arrays .asList (new  Object [][] {{ false , false  }, { false , true  } , { true , false  }, { true , true  }});
70+     }
71+ 
5772    private  static  void  insert (ICluster <IInvokableInstance > cluster , String  keyspace , int  start , int  end , int  ... nodes )
5873    {
5974        String  insert  = String .format ("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');" , keyspace );
@@ -85,7 +100,7 @@ private static void flush(ICluster<IInvokableInstance> cluster, String keyspace,
85100                                                                                                     ColumnFamilyStore .FlushReason .UNIT_TESTS )));
86101    }
87102
88-     private  static   ICluster  create (Consumer <IInstanceConfig > configModifier ) throws  IOException 
103+     private  ICluster  create (Consumer <IInstanceConfig > configModifier ) throws  IOException 
89104    {
90105        configModifier  = configModifier .andThen (
91106        config  -> config .set ("hinted_handoff_enabled" , false )
@@ -98,6 +113,15 @@ private static ICluster create(Consumer<IInstanceConfig> configModifier) throws
98113
99114    static  void  repair (ICluster <IInvokableInstance > cluster , String  keyspace , Map <String , String > options )
100115    {
116+         long [] startPositions  = new  long [cluster .size ()];
117+         for  (int  i  = 1 ; i  <= cluster .size (); i ++)
118+         {
119+             IInvokableInstance  node  = cluster .get (i );
120+             if  (node .isShutdown ())
121+                 continue ;
122+             startPositions [i  - 1 ] = node .logs ().mark ();
123+         }
124+ 
101125        cluster .get (1 ).runOnInstance (rethrow (() -> {
102126            Condition  await  = newOneTimeCondition ();
103127            instance .repair (keyspace , options , of ((tag , event ) -> {
@@ -106,14 +130,25 @@ static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<St
106130            })).right .get ();
107131            await .await (1L , MINUTES );
108132        }));
133+ 
134+         for  (int  i  = 1 ; i  <= cluster .size (); i ++)
135+         {
136+             IInvokableInstance  node  = cluster .get (i );
137+             if  (node .isShutdown ())
138+                 continue ;
139+             Assert .assertEquals ("We should use the local write path (which requires flushing) if CDC is enabled on both a node and table-level" ,
140+                                 nodesHaveCDC  && tableHasCDC ,
141+                                 !node .logs ().grep (startPositions [i  - 1 ],
142+                                                  String .format ("Enqueuing flush of %s.test, Reason\\ : STREAMS_RECEIVED" , keyspace )).getResult ().isEmpty ());
143+         }
109144    }
110145
111-     static   void  populate (ICluster <IInvokableInstance > cluster , String  keyspace , String  compression ) throws  Exception 
146+     void  populate (ICluster <IInvokableInstance > cluster , String  keyspace , String  compression ) throws  Exception 
112147    {
113148        try 
114149        {
115150            cluster .schemaChange (String .format ("DROP TABLE IF EXISTS %s.test;" , keyspace ));
116-             cluster .schemaChange (String .format ("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s" , keyspace , compression ));
151+             cluster .schemaChange (String .format ("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s AND cdc = %s; " , keyspace , compression ,  tableHasCDC ));
117152
118153            insert (cluster , keyspace , 0 , 1000 , 1 , 2 , 3 );
119154            flush (cluster , keyspace , 1 );
@@ -147,10 +182,21 @@ void shutDownNodesAndForceRepair(ICluster<IInvokableInstance> cluster, String ke
147182        repair (cluster , keyspace , ImmutableMap .of ("forceRepair" , "true" ));
148183    }
149184
150-     @ BeforeClass 
151-     public  static  void  setupCluster () throws  IOException 
185+     public  RepairTest (boolean  nodesHaveCDC , boolean  tableHasCDC ) throws  Exception 
152186    {
153-         cluster  = create (config  -> {});
187+         // This runs per method, but we only want to rebuild the cluster if nodesHaveCDC has changed since the last 
188+         // build and we need to update the configuration accordingly 
189+         if  (cluster  != null  && RepairTest .nodesHaveCDC  != nodesHaveCDC )
190+         {
191+             cluster .close ();
192+             cluster  = null ;
193+         }
194+ 
195+         if  (cluster  == null )
196+             cluster  = create (config  -> config .set ("cdc_enabled" , nodesHaveCDC ));
197+ 
198+         RepairTest .nodesHaveCDC  = nodesHaveCDC ;
199+         RepairTest .tableHasCDC  = tableHasCDC ;
154200    }
155201
156202    @ AfterClass 
@@ -203,7 +249,12 @@ public void testForcedNormalRepairWithOneNodeDown() throws Exception
203249        String  forceRepairKeyspace  = "test_force_repair_keyspace" ;
204250        int  rf  = 2 ;
205251        int  tokenCount  = ClusterUtils .getTokenCount (cluster .get (1 ));
206-         cluster .schemaChange ("CREATE KEYSPACE "  + forceRepairKeyspace  + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "  + rf  + "};" );
252+ 
253+         cluster .schemaChange ("CREATE KEYSPACE IF NOT EXISTS "  + forceRepairKeyspace  + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "  + rf  + "};" );
254+ 
255+         // Truncate distributed repair keyspace due to test class parameterization. We only want results 
256+         // from our run 
257+         cluster .schemaChange ("TRUNCATE TABLE "  + SchemaConstants .DISTRIBUTED_KEYSPACE_NAME  + "."  + SystemDistributedKeyspace .PARENT_REPAIR_HISTORY );
207258
208259        try 
209260        {
0 commit comments