Skip to content

Commit

Permalink
Add support for Cassandra decimal as Elasticsearch #139
Browse files Browse the repository at this point in the history
  • Loading branch information
vroyer committed Jun 2, 2019
1 parent a28a6a7 commit a929686
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 15 deletions.
2 changes: 2 additions & 0 deletions docs/elassandra/source/mapping.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Below is the mapping from Elasticsearch field basic types to CQL3 types :
+--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| long | bigint | |
+--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| keyword | decimal | Existing Cassandra *decimal* columns are mapped to an Elasticsearch keyword. |
+--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| long | time | Existing Cassandra *time* columns (64-bit signed integer representing |
| | | the number of nanoseconds since midnight) stored as long in Elasticsearch. |
+--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void validate(KeyspaceMetadata ksm , CFMetaData cfm) throws Configuration
if (!existingCql3.equals(inferedCql) &&
!(existingCql3.endsWith("uuid") && inferedCql.equals("text")) && // #74 uuid is mapped as keyword
!(existingCql3.equals("timeuuid") && (inferedCql.equals("timestamp") || inferedCql.equals("text"))) &&
!(existingCql3.equals("decimal") && inferedCql.equals("text")) &&
!(existingCql3.equals("date") && inferedCql.equals("timestamp")) &&
!(existingCql3.equals("time") && inferedCql.equals("bigint"))
) // timeuuid can be mapped to date
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/elassandra/cluster/QueryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ public Object[] rowAsArray(final IndexService indexService, final String type, U
case FLOAT:
values[i] = value(fieldMapper, row.getFloat(columnName), valueForSearch);
break;
case DECIMAL:
values[i] = value(fieldMapper, row.getDecimal(columnName), valueForSearch);
break;
case BLOB:
values[i] = value(fieldMapper,
row.getBlob(columnName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public class SchemaManager extends AbstractComponent {
.put("inet", "ip" )
.put("uuid", "keyword" )
.put("timeuuid", "keyword" )
.put("decimal", "keyword" )
.build();

public SchemaManager(Settings settings, ClusterService clusterService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ private Query buildQuery(ColumnDefinition cd, FieldMapper mapper, Object start,
case ASCII:
case TEXT:
case VARCHAR:
case DECIMAL:
query = start != null && end != null && ((Comparable) start).compareTo(end) == 0 ?
new TermQuery(new Term(mapper.name(), BytesRefs.toBytesRef(start))) :
new TermRangeQuery(mapper.name(), BytesRefs.toBytesRef(start), BytesRefs.toBytesRef(end), includeLower, includeUpper);
Expand Down Expand Up @@ -1119,8 +1120,6 @@ private Query buildQuery(ColumnDefinition cd, FieldMapper mapper, Object start,
case BOOLEAN:
query = ((BooleanFieldMapper) mapper).fieldType().rangeQuery(start, end, includeLower, includeUpper, null);
break;
case DECIMAL:
throw new UnsupportedOperationException("Unsupported type [decimal] in primary key");
case BLOB:
throw new UnsupportedOperationException("Unsupported type [blob] in primary key");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.index.mapper;

import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.DecimalType;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.lucene.analysis.TokenStream;
Expand All @@ -45,6 +45,7 @@
import org.elasticsearch.index.query.QueryShardContext;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -264,6 +265,9 @@ public Object cqlValue(Object value, AbstractType atype) {
// #74 workaround
return UUID.fromString(value.toString());
}
if (atype instanceof DecimalType) {
return new BigDecimal(value.toString());
}
return value.toString();
}

Expand Down Expand Up @@ -389,7 +393,7 @@ protected void parseCreateField(ParseContext context, List<IndexableField> field

@Override
public void createField(ParseContext context, Object object) throws IOException {
String value = (object instanceof UUID) ? object.toString() : (String) object; // #74 uuid stored as string
String value = (object == null || object instanceof String) ? (String) object : object.toString(); // #74 uuid stored as string
if (value == null)
value = fieldType().nullValueAsString();

Expand Down
25 changes: 14 additions & 11 deletions server/src/test/java/org/elassandra/CqlTypesTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.Test;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testAllTypesTest() throws Exception {
ensureGreen("ks1");

process(ConsistencyLevel.ONE,
"CREATE TABLE ks1.natives (c1 text primary key, c2 text, c3 timestamp, c4 int, c5 bigint, c6 double, c7 float, c8 boolean, c9 blob, c10 uuid, c11 timeuuid, c12 smallint, c13 tinyint)");
"CREATE TABLE ks1.natives (c1 text primary key, c2 text, c3 timestamp, c4 int, c5 bigint, c6 double, c7 float, c8 boolean, c9 blob, c10 uuid, c11 timeuuid, c12 smallint, c13 tinyint, c14 decimal)");
assertAcked(client().admin().indices()
.preparePutMapping("ks1")
.setType("natives")
Expand All @@ -116,7 +117,7 @@ public void testAllTypesTest() throws Exception {

// {"c2": "toto", "c3" : "2016-10-10", "c4": 1, "c5":44, "c6":1.0, "c7":2.22, "c8": true, "c9":"U29tZSBiaW5hcnkgYmxvYg==" }
assertThat(client().prepareIndex("ks1", "natives", "1")
.setSource("{\"c2\": \"toto\", \"c3\" : \"2016-10-10\", \"c4\": 1, \"c5\":44, \"c6\":1.0, \"c7\":2.22, \"c8\": true, \"c9\":\"U29tZSBiaW5hcnkgYmxvYg==\", \"c10\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\",\"c11\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\", \"c12\":1, \"c13\":1 }", XContentType.JSON)
.setSource("{\"c2\": \"toto\", \"c3\" : \"2016-10-10T00:00:00.000Z\", \"c4\": 1, \"c5\":44, \"c6\":1.0, \"c7\":2.22, \"c8\": true, \"c9\":\"U29tZSBiaW5hcnkgYmxvYg==\", \"c10\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\",\"c11\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\", \"c12\":1, \"c13\":1, \"c14\":\"3.1416\" }", XContentType.JSON)
.get().getResult(), equalTo(DocWriteResponse.Result.CREATED));
Map<String,Object> fields = client().prepareSearch("ks1").setTypes("natives").setQuery(QueryBuilders.queryStringQuery("c2:toto"))
.get().getHits().getHits()[0]
Expand All @@ -131,30 +132,31 @@ public void testAllTypesTest() throws Exception {
assertThat(fields.get("c9"),equalTo("U29tZSBiaW5hcnkgYmxvYg=="));
assertThat(fields.get("c12"),equalTo(1));
assertThat(fields.get("c13"),equalTo(1));
assertThat(fields.get("c14"),equalTo("3.1416"));

process(ConsistencyLevel.ONE,"insert into ks1.natives (c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13) VALUES ('tutu', 'titi', '2016-11-11', 1, 45, 1.0, 2.23, false,textAsBlob('bdb14fbe076f6b94444c660e36a400151f26fc6f'),ae8c9260-dd02-11e6-b9d5-bbfb41c263ba,ae8c9260-dd02-11e6-b9d5-bbfb41c263ba, 1, 1)");
process(ConsistencyLevel.ONE,"insert into ks1.natives (c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14) VALUES ('tutu', 'titi', '2016-11-11T00:00:00.000Z', 1, 45, 1.0, 2.23, false,textAsBlob('bdb14fbe076f6b94444c660e36a400151f26fc6f'),ae8c9260-dd02-11e6-b9d5-bbfb41c263ba,ae8c9260-dd02-11e6-b9d5-bbfb41c263ba, 1, 1, 3.1416)");
assertThat(client().prepareSearch().setIndices("ks1").setTypes("natives").setQuery(QueryBuilders.matchAllQuery()).get().getHits().getTotalHits(), equalTo(2L));

fields = client().prepareSearch().setIndices("ks1").setTypes("natives").setQuery(QueryBuilders.queryStringQuery("c5:45")).get().getHits().getHits()[0].getSourceAsMap();

assertThat(fields.get("c2"), equalTo("titi"));
//assertThat(fields.get("c3"), equalTo(new SimpleDateFormat("yyyy-MM-dd").parse("2016-11-11").toLocaleString()));
assertThat(fields.get("c3"), equalTo("2016-11-11T00:00:00.000Z"));
assertThat(fields.get("c4"),equalTo(1));
assertThat(fields.get("c5"),equalTo(45));
assertThat(fields.get("c6"),equalTo(1.0));
assertThat(fields.get("c7"),equalTo(2.23));
assertThat(fields.get("c8"),equalTo(false));
assertThat(fields.get("c12"),equalTo(1));
assertThat(fields.get("c13"),equalTo(1));
assertThat(fields.get("c14"),equalTo("3.1416"));
}

@Test
public void testSinglePkTypesTest() throws Exception {
createIndex("ks");
ensureGreen("ks");

String[] types = new String[] { "text","int","smallint","tinyint","bigint","double","float","boolean","blob","timestamp","date","inet","uuid" };
Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID() };
String[] types = new String[] { "text","int","smallint","tinyint","bigint","double","float","boolean","blob","timestamp","date","inet","uuid","decimal" };
Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID(), new BigDecimal("3.1416") };
for(int i=0; i < types.length; i++) {
String type = types[i];
Object value = values[i];
Expand Down Expand Up @@ -197,9 +199,9 @@ public void testCompoundPkTypesTest() throws Exception {
ensureGreen("ks");

Date now = new Date();
String[] types = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date", "inet","uuid","timeuuid","timeuuid" };
String[] names = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date2", "inet","uuid","timeuuid","timeuuid2" };
Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID(), UUIDGen.getTimeUUID(now.getTime()), UUIDGen.getTimeUUID(now.getTime()) };
String[] types = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date", "inet","uuid","timeuuid","timeuuid","decimal" };
String[] names = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date2", "inet","uuid","timeuuid","timeuuid2","decimal"};
Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID(), UUIDGen.getTimeUUID(now.getTime()), UUIDGen.getTimeUUID(now.getTime()), new BigDecimal("3.1416") };
int randomCk = randomInt(types.length-1);
int randomVal= randomInt(types.length-1);
for(int i=0; i < types.length; i++) {
Expand Down Expand Up @@ -249,7 +251,8 @@ public void testCompoundPkTypesTest() throws Exception {
System.out.println("delete pk name="+name+" type="+type+" value="+values[i]+" ck type="+types[randomCk]+" value="+values[randomCk]);
process(ConsistencyLevel.ONE,String.format(Locale.ROOT,"DELETE FROM ks.t%s WHERE pk%s = ? AND ck >= ?", name, name), values[i], values[randomCk]);
// blob not supported for delete by query
assertThat(client().prepareSearch()
assertThat("search in ks"+i+" type="+String.format(Locale.ROOT,"t%s", name)+" cql_type="+type,
client().prepareSearch()
.setIndices("ks"+i)
.setTypes(String.format(Locale.ROOT,"t%s", name))
.setQuery(QueryBuilders.matchAllQuery())
Expand Down

0 comments on commit a929686

Please sign in to comment.