Skip to content

Commit 4dce0b6

Browse files
author
James Carman
committed
Adding new schema migration library. This is a WIP.
1 parent 450a65b commit 4dce0b6

File tree

9 files changed

+553
-0
lines changed

9 files changed

+553
-0
lines changed

karaf/features/src/main/resources/features.xml

+9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<feature version="${project.version}">hecate-pojo</feature>
2323
<feature version="${project.version}">hecate-gson</feature>
2424
<feature version="${project.version}">hecate-joda-time</feature>
25+
<feature version="${project.version}">hecate-migrator</feature>
2526
</feature>
2627

2728
<feature name="hecate-core" version="${project.version}">
@@ -54,6 +55,14 @@
5455
<bundle>mvn:com.savoirtech.hecate/hecate-gson/${project.version}</bundle>
5556
</feature>
5657

58+
<feature name="hecate-migrator" version="${project.version}">
59+
<feature version="${project.version}">hecate-core</feature>
60+
61+
<bundle dependency="true">mvn:commons-codec/commons-codec/${commons.codec.version}</bundle>
62+
63+
<bundle>mvn:com.savoirtech.hecate/hecate-migrator/${project.version}</bundle>
64+
</feature>
65+
5766
<feature name="hecate-joda-time" version="${project.version}">
5867
<feature version="${project.version}">hecate-pojo</feature>
5968

migrator/pom.xml

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright (c) 2012-2015 Savoir Technologies, Inc.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<parent>
20+
<artifactId>hecate-parent</artifactId>
21+
<groupId>com.savoirtech.hecate</groupId>
22+
<version>2.0.9-SNAPSHOT</version>
23+
</parent>
24+
<modelVersion>4.0.0</modelVersion>
25+
26+
<name>SavoirTech ::: Hecate ::: Migrator</name>
27+
28+
<artifactId>hecate-migrator</artifactId>
29+
30+
<packaging>bundle</packaging>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>commons-codec</groupId>
35+
<artifactId>commons-codec</artifactId>
36+
<version>${commons.codec.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>com.savoirtech.hecate</groupId>
40+
<artifactId>hecate-core</artifactId>
41+
<version>2.0.9-SNAPSHOT</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>com.savoirtech.hecate</groupId>
45+
<artifactId>hecate-test</artifactId>
46+
<version>2.0.9-SNAPSHOT</version>
47+
<scope>test</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.mockito</groupId>
51+
<artifactId>mockito-core</artifactId>
52+
<version>${mockito.version}</version>
53+
</dependency>
54+
</dependencies>
55+
56+
<build>
57+
<plugins>
58+
<plugin>
59+
<groupId>org.apache.felix</groupId>
60+
<artifactId>maven-bundle-plugin</artifactId>
61+
</plugin>
62+
</plugins>
63+
</build>
64+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright (c) 2012-2015 Savoir Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.savoirtech.hecate.migrator;
18+
19+
import java.io.ByteArrayOutputStream;
20+
import java.io.IOException;
21+
import java.io.ObjectOutputStream;
22+
import java.io.ObjectStreamClass;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.UUID;
26+
27+
import com.datastax.driver.core.ConsistencyLevel;
28+
import com.datastax.driver.core.KeyspaceMetadata;
29+
import com.datastax.driver.core.ResultSet;
30+
import com.datastax.driver.core.Session;
31+
import com.savoirtech.hecate.core.mapping.MappedQueryResult;
32+
import com.savoirtech.hecate.migrator.exception.SchemaMigrationException;
33+
import org.apache.commons.codec.binary.StringUtils;
34+
import org.apache.commons.codec.digest.DigestUtils;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import static com.datastax.driver.core.DataType.cint;
39+
import static com.datastax.driver.core.DataType.varchar;
40+
import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
41+
import static com.datastax.driver.core.schemabuilder.SchemaBuilder.createTable;
42+
43+
public class SchemaMigration {
44+
//----------------------------------------------------------------------------------------------------------------------
45+
// Fields
46+
//----------------------------------------------------------------------------------------------------------------------
47+
48+
public static final String MIGRATION_TABLE = "hecate_migration";
49+
public static final String ID_COL = "id";
50+
public static final String INDEX_COL = "ndx";
51+
public static final String TOKEN_COL = "sync_token";
52+
public static final String STATUS_COL = "status";
53+
public static final String FINGERPRINT_COL = "fingerprint";
54+
private static Logger LOGGER = LoggerFactory.getLogger(SchemaMigration.class);
55+
56+
private List<IdAndStep> agenda = new ArrayList<>();
57+
58+
//----------------------------------------------------------------------------------------------------------------------
59+
// Static Methods
60+
//----------------------------------------------------------------------------------------------------------------------
61+
62+
public static void createMigrationTable(Session session) {
63+
session.execute(createTable(MIGRATION_TABLE)
64+
.addPartitionKey(ID_COL, varchar())
65+
.addColumn(INDEX_COL, cint())
66+
.addColumn(TOKEN_COL, varchar())
67+
.addColumn(STATUS_COL, varchar())
68+
.addColumn(FINGERPRINT_COL, varchar()));
69+
}
70+
71+
protected static String fingerprint(IdAndStep idAndStep) {
72+
try {
73+
try (ByteArrayOutputStream bout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bout)) {
74+
ObjectStreamClass osc = ObjectStreamClass.lookup(idAndStep.getStep().getClass());
75+
out.writeLong(osc.getSerialVersionUID());
76+
out.writeObject(idAndStep.getStep());
77+
out.close();
78+
return DigestUtils.md5Hex(bout.toByteArray());
79+
}
80+
} catch (IOException e) {
81+
throw new SchemaMigrationException(e, "Unable to calculate fingerprint for migration step \"{}\".", idAndStep.getId());
82+
}
83+
}
84+
85+
//----------------------------------------------------------------------------------------------------------------------
86+
// Other Methods
87+
//----------------------------------------------------------------------------------------------------------------------
88+
89+
public SchemaMigration addStep(String id, SchemaMigrationStep step) {
90+
agenda.add(new IdAndStep(id, step));
91+
return this;
92+
}
93+
94+
public void execute(Session session) {
95+
session.getCluster().getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
96+
KeyspaceMetadata keyspace = session.getCluster().getMetadata().getKeyspace(session.getLoggedKeyspace());
97+
if (keyspace.getTable(MIGRATION_TABLE) == null) {
98+
throw new SchemaMigrationException("Schema migration table \"%s\" not found.", MIGRATION_TABLE);
99+
}
100+
101+
int currentIndex = 0;
102+
for (IdAndStep idAndStep : agenda) {
103+
final String fingerprint = fingerprint(idAndStep);
104+
SchemaMigrationStepTracking tracking = getStepTracking(idAndStep.getId(), session);
105+
if (tracking != null) {
106+
if (currentIndex != tracking.getIndex()) {
107+
throw new SchemaMigrationException("Schema migration step \"%s\" is out of sync. Expected index %d, but was %d.", idAndStep.getId(), currentIndex, tracking.getIndex());
108+
} else if (requiresAbort(tracking, fingerprint)) {
109+
return;
110+
}
111+
} else {
112+
String syncToken = newSyncToken();
113+
tracking = new SchemaMigrationStepTracking();
114+
tracking.setId(idAndStep.getId());
115+
tracking.setIndex(currentIndex);
116+
tracking.setToken(syncToken);
117+
tracking.setStatus(SchemaMigrationStepStatus.Running);
118+
tracking.setFingerprint(fingerprint);
119+
createStepTracking(tracking, session);
120+
tracking = getStepTracking(idAndStep.getId(), session);
121+
if (syncToken.equals(tracking.getToken())) {
122+
LOGGER.info("Executing migration step \"{}\"...", tracking.getId());
123+
idAndStep.getStep().execute(session);
124+
tracking.setStatus(SchemaMigrationStepStatus.Complete);
125+
updateStepTracking(tracking, session);
126+
} else if (requiresAbort(tracking, fingerprint)) {
127+
return;
128+
}
129+
}
130+
currentIndex++;
131+
}
132+
}
133+
134+
private void createStepTracking(SchemaMigrationStepTracking tracking, Session session) {
135+
session.execute(
136+
insertInto(MIGRATION_TABLE)
137+
.ifNotExists()
138+
.value(ID_COL, tracking.getId())
139+
.value(INDEX_COL, tracking.getIndex())
140+
.value(TOKEN_COL, tracking.getToken())
141+
.value(STATUS_COL, tracking.getStatus().name())
142+
.value(FINGERPRINT_COL, tracking.getFingerprint()));
143+
}
144+
145+
private SchemaMigrationStepTracking getStepTracking(String id, Session session) {
146+
ResultSet resultSet = session.execute(select(ID_COL, INDEX_COL, TOKEN_COL, STATUS_COL, FINGERPRINT_COL).from(MIGRATION_TABLE).where(eq(ID_COL, id)));
147+
return new MappedQueryResult<>(resultSet, row -> {
148+
SchemaMigrationStepTracking tracking = new SchemaMigrationStepTracking();
149+
tracking.setId(row.getString(0));
150+
tracking.setIndex(row.getInt(1));
151+
tracking.setToken(row.getString(2));
152+
tracking.setStatus(SchemaMigrationStepStatus.valueOf(row.getString(3)));
153+
tracking.setFingerprint(row.getString(4));
154+
return tracking;
155+
}).one();
156+
}
157+
158+
private void updateStepTracking(SchemaMigrationStepTracking tracking, Session session) {
159+
session.execute(
160+
insertInto(MIGRATION_TABLE)
161+
.value(ID_COL, tracking.getId())
162+
.value(INDEX_COL, tracking.getIndex())
163+
.value(TOKEN_COL, tracking.getToken())
164+
.value(STATUS_COL, tracking.getStatus().name())
165+
.value(FINGERPRINT_COL, tracking.getFingerprint()));
166+
}
167+
168+
private boolean requiresAbort(SchemaMigrationStepTracking existing, String expectedFingerprint) {
169+
if(SchemaMigrationStepStatus.Running.equals(existing.getStatus())) {
170+
LOGGER.error("Another process is current executing migration step \"{}\", aborting.", existing.getId());
171+
return true;
172+
}
173+
174+
if (!StringUtils.equals(expectedFingerprint, existing.getFingerprint())) {
175+
LOGGER.error("Fingerprint for migration step \"{}\" does not match expected value, aborting.", existing.getId());
176+
return true;
177+
}
178+
179+
LOGGER.info("Migration step \"{}\" already complete, continuing...", existing.getId());
180+
return false;
181+
}
182+
183+
protected String newSyncToken() {
184+
return UUID.randomUUID().toString();
185+
}
186+
187+
//----------------------------------------------------------------------------------------------------------------------
188+
// Inner Classes
189+
//----------------------------------------------------------------------------------------------------------------------
190+
191+
protected static class IdAndStep {
192+
private final String id;
193+
private final SchemaMigrationStep step;
194+
195+
public IdAndStep(String id, SchemaMigrationStep step) {
196+
this.id = id;
197+
this.step = step;
198+
}
199+
200+
public String getId() {
201+
return id;
202+
}
203+
204+
public SchemaMigrationStep getStep() {
205+
return step;
206+
}
207+
}
208+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2012-2015 Savoir Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.savoirtech.hecate.migrator;
18+
19+
import java.io.Serializable;
20+
21+
import com.datastax.driver.core.Session;
22+
23+
@FunctionalInterface
24+
public interface SchemaMigrationStep extends Serializable {
25+
//----------------------------------------------------------------------------------------------------------------------
26+
// Other Methods
27+
//----------------------------------------------------------------------------------------------------------------------
28+
29+
void execute(Session session);
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2012-2015 Savoir Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.savoirtech.hecate.migrator;
18+
19+
public enum SchemaMigrationStepStatus {
20+
Running, Complete
21+
}

0 commit comments

Comments
 (0)