From a9296860b5fea5fdf79222f967db291a9b3c9161 Mon Sep 17 00:00:00 2001 From: vroyer Date: Sun, 2 Jun 2019 22:57:00 +0200 Subject: [PATCH] Add support for Cassandra decimal as Elasticsearch #139 --- docs/elassandra/source/mapping.rst | 2 ++ .../elassandra/cluster/ColumnDescriptor.java | 1 + .../org/elassandra/cluster/QueryManager.java | 3 +++ .../org/elassandra/cluster/SchemaManager.java | 1 + .../index/ElasticSecondaryIndex.java | 3 +-- .../index/mapper/KeywordFieldMapper.java | 8 ++++-- .../java/org/elassandra/CqlTypesTests.java | 25 +++++++++++-------- 7 files changed, 28 insertions(+), 15 deletions(-) diff --git a/docs/elassandra/source/mapping.rst b/docs/elassandra/source/mapping.rst index 9ad36fe3697..0841df4b90c 100644 --- a/docs/elassandra/source/mapping.rst +++ b/docs/elassandra/source/mapping.rst @@ -30,6 +30,8 @@ Below is the mapping from Elasticsearch field basic types to CQL3 types : +--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | long | bigint | | +--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| keyword | decimal | Existing Cassandra *decimal* columns are mapped to an Elasticsearch keyword. | ++--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | long | time | Existing Cassandra *time* columns (64-bit signed integer representing | | | | the number of nanoseconds since midnight) stored as long in Elasticsearch. | +--------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/server/src/main/java/org/elassandra/cluster/ColumnDescriptor.java b/server/src/main/java/org/elassandra/cluster/ColumnDescriptor.java index 977c3244e7b..9982ad22642 100644 --- a/server/src/main/java/org/elassandra/cluster/ColumnDescriptor.java +++ b/server/src/main/java/org/elassandra/cluster/ColumnDescriptor.java @@ -75,6 +75,7 @@ public void validate(KeyspaceMetadata ksm , CFMetaData cfm) throws Configuration if (!existingCql3.equals(inferedCql) && !(existingCql3.endsWith("uuid") && inferedCql.equals("text")) && // #74 uuid is mapped as keyword !(existingCql3.equals("timeuuid") && (inferedCql.equals("timestamp") || inferedCql.equals("text"))) && + !(existingCql3.equals("decimal") && inferedCql.equals("text")) && !(existingCql3.equals("date") && inferedCql.equals("timestamp")) && !(existingCql3.equals("time") && inferedCql.equals("bigint")) ) // timeuuid can be mapped to date diff --git a/server/src/main/java/org/elassandra/cluster/QueryManager.java b/server/src/main/java/org/elassandra/cluster/QueryManager.java index 04abcce77ca..382338b75e7 100644 --- a/server/src/main/java/org/elassandra/cluster/QueryManager.java +++ b/server/src/main/java/org/elassandra/cluster/QueryManager.java @@ -561,6 +561,9 @@ public Object[] rowAsArray(final IndexService indexService, final String type, U case FLOAT: values[i] = value(fieldMapper, row.getFloat(columnName), valueForSearch); break; + case DECIMAL: + values[i] = value(fieldMapper, row.getDecimal(columnName), valueForSearch); + break; case BLOB: values[i] = value(fieldMapper, row.getBlob(columnName), diff --git a/server/src/main/java/org/elassandra/cluster/SchemaManager.java b/server/src/main/java/org/elassandra/cluster/SchemaManager.java index 2f5c8da33ef..192c2990b13 100644 --- a/server/src/main/java/org/elassandra/cluster/SchemaManager.java +++ b/server/src/main/java/org/elassandra/cluster/SchemaManager.java @@ -176,6 +176,7 @@ public class SchemaManager extends AbstractComponent { .put("inet", "ip" ) .put("uuid", "keyword" ) .put("timeuuid", "keyword" ) + .put("decimal", "keyword" ) .build(); public SchemaManager(Settings settings, ClusterService clusterService) { diff --git a/server/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java b/server/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java index 44107c1af65..4afb85580f5 100644 --- a/server/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java +++ b/server/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java @@ -1004,6 +1004,7 @@ private Query buildQuery(ColumnDefinition cd, FieldMapper mapper, Object start, case ASCII: case TEXT: case VARCHAR: + case DECIMAL: query = start != null && end != null && ((Comparable) start).compareTo(end) == 0 ? new TermQuery(new Term(mapper.name(), BytesRefs.toBytesRef(start))) : new TermRangeQuery(mapper.name(), BytesRefs.toBytesRef(start), BytesRefs.toBytesRef(end), includeLower, includeUpper); @@ -1119,8 +1120,6 @@ private Query buildQuery(ColumnDefinition cd, FieldMapper mapper, Object start, case BOOLEAN: query = ((BooleanFieldMapper) mapper).fieldType().rangeQuery(start, end, includeLower, includeUpper, null); break; - case DECIMAL: - throw new UnsupportedOperationException("Unsupported type [decimal] in primary key"); case BLOB: throw new UnsupportedOperationException("Unsupported type [blob] in primary key"); } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java index 47489d19bad..24fd70e2a36 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java @@ -19,8 +19,8 @@ package org.elasticsearch.index.mapper; -import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.DecimalType; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.lucene.analysis.TokenStream; @@ -45,6 +45,7 @@ import org.elasticsearch.index.query.QueryShardContext; import java.io.IOException; +import java.math.BigDecimal; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -264,6 +265,9 @@ public Object cqlValue(Object value, AbstractType atype) { // #74 workaround return UUID.fromString(value.toString()); } + if (atype instanceof DecimalType) { + return new BigDecimal(value.toString()); + } return value.toString(); } @@ -389,7 +393,7 @@ protected void parseCreateField(ParseContext context, List field @Override public void createField(ParseContext context, Object object) throws IOException { - String value = (object instanceof UUID) ? object.toString() : (String) object; // #74 uuid stored as string + String value = (object == null || object instanceof String) ? (String) object : object.toString(); // #74 uuid stored as string if (value == null) value = fieldType().nullValueAsString(); diff --git a/server/src/test/java/org/elassandra/CqlTypesTests.java b/server/src/test/java/org/elassandra/CqlTypesTests.java index db861c8b676..ad2b3a27978 100644 --- a/server/src/test/java/org/elassandra/CqlTypesTests.java +++ b/server/src/test/java/org/elassandra/CqlTypesTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.Test; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDate; @@ -107,7 +108,7 @@ public void testAllTypesTest() throws Exception { ensureGreen("ks1"); process(ConsistencyLevel.ONE, - "CREATE TABLE ks1.natives (c1 text primary key, c2 text, c3 timestamp, c4 int, c5 bigint, c6 double, c7 float, c8 boolean, c9 blob, c10 uuid, c11 timeuuid, c12 smallint, c13 tinyint)"); + "CREATE TABLE ks1.natives (c1 text primary key, c2 text, c3 timestamp, c4 int, c5 bigint, c6 double, c7 float, c8 boolean, c9 blob, c10 uuid, c11 timeuuid, c12 smallint, c13 tinyint, c14 decimal)"); assertAcked(client().admin().indices() .preparePutMapping("ks1") .setType("natives") @@ -116,7 +117,7 @@ public void testAllTypesTest() throws Exception { // {"c2": "toto", "c3" : "2016-10-10", "c4": 1, "c5":44, "c6":1.0, "c7":2.22, "c8": true, "c9":"U29tZSBiaW5hcnkgYmxvYg==" } assertThat(client().prepareIndex("ks1", "natives", "1") - .setSource("{\"c2\": \"toto\", \"c3\" : \"2016-10-10\", \"c4\": 1, \"c5\":44, \"c6\":1.0, \"c7\":2.22, \"c8\": true, \"c9\":\"U29tZSBiaW5hcnkgYmxvYg==\", \"c10\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\",\"c11\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\", \"c12\":1, \"c13\":1 }", XContentType.JSON) + .setSource("{\"c2\": \"toto\", \"c3\" : \"2016-10-10T00:00:00.000Z\", \"c4\": 1, \"c5\":44, \"c6\":1.0, \"c7\":2.22, \"c8\": true, \"c9\":\"U29tZSBiaW5hcnkgYmxvYg==\", \"c10\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\",\"c11\":\"ae8c9260-dd02-11e6-b9d5-bbfb41c263ba\", \"c12\":1, \"c13\":1, \"c14\":\"3.1416\" }", XContentType.JSON) .get().getResult(), equalTo(DocWriteResponse.Result.CREATED)); Map fields = client().prepareSearch("ks1").setTypes("natives").setQuery(QueryBuilders.queryStringQuery("c2:toto")) .get().getHits().getHits()[0] @@ -131,14 +132,14 @@ public void testAllTypesTest() throws Exception { assertThat(fields.get("c9"),equalTo("U29tZSBiaW5hcnkgYmxvYg==")); assertThat(fields.get("c12"),equalTo(1)); assertThat(fields.get("c13"),equalTo(1)); + assertThat(fields.get("c14"),equalTo("3.1416")); - process(ConsistencyLevel.ONE,"insert into ks1.natives (c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13) VALUES ('tutu', 'titi', '2016-11-11', 1, 45, 1.0, 2.23, false,textAsBlob('bdb14fbe076f6b94444c660e36a400151f26fc6f'),ae8c9260-dd02-11e6-b9d5-bbfb41c263ba,ae8c9260-dd02-11e6-b9d5-bbfb41c263ba, 1, 1)"); + process(ConsistencyLevel.ONE,"insert into ks1.natives (c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14) VALUES ('tutu', 'titi', '2016-11-11T00:00:00.000Z', 1, 45, 1.0, 2.23, false,textAsBlob('bdb14fbe076f6b94444c660e36a400151f26fc6f'),ae8c9260-dd02-11e6-b9d5-bbfb41c263ba,ae8c9260-dd02-11e6-b9d5-bbfb41c263ba, 1, 1, 3.1416)"); assertThat(client().prepareSearch().setIndices("ks1").setTypes("natives").setQuery(QueryBuilders.matchAllQuery()).get().getHits().getTotalHits(), equalTo(2L)); fields = client().prepareSearch().setIndices("ks1").setTypes("natives").setQuery(QueryBuilders.queryStringQuery("c5:45")).get().getHits().getHits()[0].getSourceAsMap(); - assertThat(fields.get("c2"), equalTo("titi")); - //assertThat(fields.get("c3"), equalTo(new SimpleDateFormat("yyyy-MM-dd").parse("2016-11-11").toLocaleString())); + assertThat(fields.get("c3"), equalTo("2016-11-11T00:00:00.000Z")); assertThat(fields.get("c4"),equalTo(1)); assertThat(fields.get("c5"),equalTo(45)); assertThat(fields.get("c6"),equalTo(1.0)); @@ -146,6 +147,7 @@ public void testAllTypesTest() throws Exception { assertThat(fields.get("c8"),equalTo(false)); assertThat(fields.get("c12"),equalTo(1)); assertThat(fields.get("c13"),equalTo(1)); + assertThat(fields.get("c14"),equalTo("3.1416")); } @Test @@ -153,8 +155,8 @@ public void testSinglePkTypesTest() throws Exception { createIndex("ks"); ensureGreen("ks"); - String[] types = new String[] { "text","int","smallint","tinyint","bigint","double","float","boolean","blob","timestamp","date","inet","uuid" }; - Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID() }; + String[] types = new String[] { "text","int","smallint","tinyint","bigint","double","float","boolean","blob","timestamp","date","inet","uuid","decimal" }; + Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID(), new BigDecimal("3.1416") }; for(int i=0; i < types.length; i++) { String type = types[i]; Object value = values[i]; @@ -197,9 +199,9 @@ public void testCompoundPkTypesTest() throws Exception { ensureGreen("ks"); Date now = new Date(); - String[] types = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date", "inet","uuid","timeuuid","timeuuid" }; - String[] names = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date2", "inet","uuid","timeuuid","timeuuid2" }; - Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID(), UUIDGen.getTimeUUID(now.getTime()), UUIDGen.getTimeUUID(now.getTime()) }; + String[] types = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date", "inet","uuid","timeuuid","timeuuid","decimal" }; + String[] names = new String[] { "text", "int","smallint","tinyint", "bigint","double","float","boolean","blob","timestamp","date2", "inet","uuid","timeuuid","timeuuid2","decimal"}; + Object[] values = new Object[] { "foo", 1, (short)1, (byte)1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), (int)LocalDate.now().toEpochDay(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID(), UUIDGen.getTimeUUID(now.getTime()), UUIDGen.getTimeUUID(now.getTime()), new BigDecimal("3.1416") }; int randomCk = randomInt(types.length-1); int randomVal= randomInt(types.length-1); for(int i=0; i < types.length; i++) { @@ -249,7 +251,8 @@ public void testCompoundPkTypesTest() throws Exception { System.out.println("delete pk name="+name+" type="+type+" value="+values[i]+" ck type="+types[randomCk]+" value="+values[randomCk]); process(ConsistencyLevel.ONE,String.format(Locale.ROOT,"DELETE FROM ks.t%s WHERE pk%s = ? AND ck >= ?", name, name), values[i], values[randomCk]); // blob not supported for delete by query - assertThat(client().prepareSearch() + assertThat("search in ks"+i+" type="+String.format(Locale.ROOT,"t%s", name)+" cql_type="+type, + client().prepareSearch() .setIndices("ks"+i) .setTypes(String.format(Locale.ROOT,"t%s", name)) .setQuery(QueryBuilders.matchAllQuery())