Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.CollectionUtil;

import org.apache.avro.util.Utf8;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand All @@ -51,7 +51,7 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for interoperability with Avro types. */
public class AvroTypesITCase extends AbstractTestBaseJUnit4 {
class AvroTypesITCase extends AbstractTestBase {

private static final User USER_1 =
User.newBuilder()
Expand Down Expand Up @@ -79,7 +79,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 {
.build())
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeDate(LocalDate.parse("2014-03-01"))
.setTypeTimeMillis(LocalTime.parse("12:12:12"))
.setTypeTimeMillis(LocalTime.parse("12:12:12.345"))
.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Expand Down Expand Up @@ -152,7 +152,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 {
.build();

@Test
public void testAvroToRow() throws Exception {
void testAvroToRow() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand All @@ -167,7 +167,7 @@ public void testAvroToRow() throws Exception {
String expected =
"+I[Charlie, null, blue, 1337, 1.337, null, false, [], [], null, RED, {}, null, null, "
+ "{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": \"Berlin\", \"zip\": \"12049\"}, "
+ "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
+ "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12.345, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
+ "1970-01-01T00:00:00.123456Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n"
+ "+I[Whatever, null, black, 42, 0.0, null, true, [hello], [true], null, GREEN, {}, "
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], null, null, "
Expand All @@ -181,7 +181,7 @@ public void testAvroToRow() throws Exception {
}

@Test
public void testAvroStringAccess() {
void testAvroStringAccess() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand All @@ -198,7 +198,7 @@ public void testAvroStringAccess() {
}

@Test
public void testAvroObjectAccess() throws Exception {
void testAvroObjectAccess() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand All @@ -217,7 +217,7 @@ public void testAvroObjectAccess() throws Exception {
}

@Test
public void testAvroToAvro() throws Exception {
void testAvroToAvro() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private int convertToTime(JsonParser jsonNode) throws IOException {
LocalTime localTime = parsedTime.query(TemporalQueries.localTime());

// get number of milliseconds of the day
return localTime.toSecondOfDay() * 1000;
return (int) (localTime.toNanoOfDay() / 1000_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the way that the json format parses times, then the output will change for existing users, which has implications to the logic of their processing. I suggest that this way of parsing be enabled by a new json format configuration option - so it can be knowingly adopted.

}

private TimestampData convertToTimestamp(JsonParser jp) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private int convertToTime(JsonNode jsonNode) {
LocalTime localTime = parsedTime.query(TemporalQueries.localTime());

// get number of milliseconds of the day
return localTime.toSecondOfDay() * 1000;
return (int) (localTime.toNanoOfDay() / 1000_000);
}

private TimestampData convertToTimestamp(JsonNode jsonNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private RowDataToJsonConverter createDateConverter() {
private RowDataToJsonConverter createTimeConverter() {
return (mapper, reuse, value) -> {
int millisecond = (int) value;
LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L);
LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L);
return mapper.getNodeFactory().textNode(SQL_TIME_FORMAT.format(time));
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ void testSerDe() throws Exception {
Double[] doubles = new Double[] {1.1, 2.2, 3.3};
LocalDate date = LocalDate.parse("1990-10-14");
LocalTime time = LocalTime.parse("12:12:43");
LocalTime time3 = LocalTime.parse("12:12:43.123");
Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789");
Instant timestampWithLocalZone =
Expand Down Expand Up @@ -152,6 +153,7 @@ void testSerDe() throws Exception {
root.set("doubles", doubleNode);
root.put("date", "1990-10-14");
root.put("time", "12:12:43");
root.put("time3", "12:12:43.123");
root.put("timestamp3", "1990-10-14T12:12:43.123");
root.put("timestamp9", "1990-10-14T12:12:43.123456789");
root.put("timestampWithLocalZone", "1990-10-14T12:12:43.123456789Z");
Expand All @@ -175,6 +177,7 @@ void testSerDe() throws Exception {
FIELD("doubles", ARRAY(DOUBLE())),
FIELD("date", DATE()),
FIELD("time", TIME(0)),
FIELD("time3", TIME(3)),
FIELD("timestamp3", TIMESTAMP(3)),
FIELD("timestamp9", TIMESTAMP(9)),
FIELD("timestampWithLocalZone", TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
Expand All @@ -188,7 +191,7 @@ void testSerDe() throws Exception {
isJsonParser, schema, false, false, TimestampFormat.ISO_8601);
open(deserializationSchema);

Row expected = new Row(18);
Row expected = new Row(19);
expected.setField(0, true);
expected.setField(1, tinyint);
expected.setField(2, smallint);
Expand All @@ -201,12 +204,13 @@ void testSerDe() throws Exception {
expected.setField(9, doubles);
expected.setField(10, date);
expected.setField(11, time);
expected.setField(12, timestamp3.toLocalDateTime());
expected.setField(13, timestamp9.toLocalDateTime());
expected.setField(14, timestampWithLocalZone);
expected.setField(15, map);
expected.setField(16, multiSet);
expected.setField(17, nestedMap);
expected.setField(12, time3);
expected.setField(13, timestamp3.toLocalDateTime());
expected.setField(14, timestamp9.toLocalDateTime());
expected.setField(15, timestampWithLocalZone);
expected.setField(16, map);
expected.setField(17, multiSet);
expected.setField(18, nestedMap);

RowData rowData = deserializationSchema.deserialize(serializedJson);
Row actual = convertToExternal(rowData, dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,10 @@ public static TimestampData truncate(TimestampData ts, int precision) {
}
}

public static int truncate(int time, int precision) {
return (int) zeroLastDigits(time, 3 - precision);
}

private static long zeroLastDigits(long l, int n) {
long tenToTheN = (long) Math.pow(10, n);
return (l / tenToTheN) * tenToTheN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,16 @@ public DataType getResultType() {
/** TimeLeadLagAggFunction. */
public static class TimeLeadLagAggFunction extends LeadLagAggFunction {

public TimeLeadLagAggFunction(int operandCount) {
private final TimeType type;

public TimeLeadLagAggFunction(TimeType type, int operandCount) {
super(operandCount);
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
return DataTypes.TIME(type.getPrecision());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,15 @@ public DataType getResultType() {

/** Built-in Time Max aggregate function. */
public static class TimeMaxAggFunction extends MaxAggFunction {
private final TimeType type;

public TimeMaxAggFunction(TimeType type) {
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
return DataTypes.TIME(type.getPrecision());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,15 @@ public DataType getResultType() {

/** Built-in Time Min aggregate function. */
public static class TimeMinAggFunction extends MinAggFunction {
private final TimeType type;

public TimeMinAggFunction(TimeType type) {
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
return DataTypes.TIME(type.getPrecision());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,15 @@ public static final class TimeSingleValueAggFunction extends SingleValueAggFunct

private static final long serialVersionUID = 320495723666949978L;

private final TimeType type;

public TimeSingleValueAggFunction(TimeType type) {
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
return DataTypes.TIME(type.getPrecision());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class CastRuleProvider {
.addRule(TimeToTimestampCastRule.INSTANCE)
.addRule(NumericToTimestampCastRule.INSTANCE)
.addRule(TimestampToNumericCastRule.INSTANCE)
.addRule(TimeToTimeCastRule.INSTANCE)
// To binary rules
.addRule(BinaryToBinaryCastRule.INSTANCE)
.addRule(RawToBinaryCastRule.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.functions.casting;

import org.apache.flink.table.planner.codegen.calls.BuiltInMethods;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;

/**
* {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} to {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE}.
*/
class TimeToTimeCastRule extends AbstractExpressionCodeGeneratorCastRule<Number, Number> {

static final TimeToTimeCastRule INSTANCE = new TimeToTimeCastRule();

private TimeToTimeCastRule() {
super(
CastRulePredicate.builder()
.input(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
.target(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
.build());
}

@Override
public String generateExpression(
CodeGeneratorCastRule.Context context,
String inputTerm,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
final int inputPrecision = LogicalTypeChecks.getPrecision(inputLogicalType);
int targetPrecision = LogicalTypeChecks.getPrecision(targetLogicalType);

return inputPrecision <= targetPrecision
? inputTerm
: staticCall(BuiltInMethods.TRUNCATE_SQL_TIME(), inputTerm, targetPrecision);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.flink.table.legacy.api.TableSchema
import org.apache.flink.table.legacy.types.logical.TypeInformationRawType
import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
import org.apache.flink.table.planner.plan.schema._
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
import org.apache.flink.table.runtime.types.{LogicalTypeDataTypeConverter, PlannerTypeUtils}
import org.apache.flink.table.types.logical._
Expand Down Expand Up @@ -87,7 +86,8 @@ class FlinkTypeFactory(

// temporal types
case LogicalTypeRoot.DATE => createSqlType(DATE)
case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE => createSqlType(TIME)
case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE =>
createSqlType(TIME, t.asInstanceOf[TimeType].getPrecision)

// interval types
case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
Expand Down Expand Up @@ -630,11 +630,7 @@ object FlinkTypeFactory {
// temporal types
case DATE => new DateType()
case TIME =>
if (relDataType.getPrecision > 3) {
throw new TableException(s"TIME precision is not supported: ${relDataType.getPrecision}")
}
// the planner supports precision 3, but for consistency with old planner, we set it to 0.
new TimeType()
new TimeType(relDataType.getPrecision)
case TIMESTAMP =>
new TimestampType(relDataType.getPrecision)
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ object BuiltInMethods {
val TRUNCATE_SQL_TIMESTAMP =
Types.lookupMethod(classOf[DateTimeUtils], "truncate", classOf[TimestampData], classOf[Int])

val TRUNCATE_SQL_TIME =
Types.lookupMethod(classOf[DateTimeUtils], "truncate", classOf[Int], classOf[Int])

val ADD_MONTHS =
Types.lookupMethod(classOf[DateTimeUtils], "addMonths", classOf[Long], classOf[Int])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,22 @@ object ScalarOperatorGens {
}

case (TIME_WITHOUT_TIME_ZONE, INTERVAL_DAY_TIME) =>
generateOperatorIfNotNull(ctx, new TimeType(), left, right) {
generateOperatorIfNotNull(
ctx,
new TimeType(LogicalTypeChecks.getPrecision(left.resultType)),
left,
right) {
(l, r) =>
s"java.lang.Math.toIntExact((($l + ${MILLIS_PER_DAY}L) $op (" +
s"java.lang.Math.toIntExact($r % ${MILLIS_PER_DAY}L))) % ${MILLIS_PER_DAY}L)"
}

case (TIME_WITHOUT_TIME_ZONE, INTERVAL_YEAR_MONTH) =>
generateOperatorIfNotNull(ctx, new TimeType(), left, right)((l, r) => s"$l")
generateOperatorIfNotNull(
ctx,
new TimeType(LogicalTypeChecks.getPrecision(left.resultType)),
left,
right)((l, r) => s"$l")

case (TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE, INTERVAL_DAY_TIME) =>
generateOperatorIfNotNull(ctx, left.resultType, left, right) {
Expand Down Expand Up @@ -609,9 +617,11 @@ object ScalarOperatorGens {
}
}
// both sides are numeric
else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
(leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
}
else if (
isNumeric(left.resultType) && isNumeric(right.resultType)
|| isTime(left.resultType) &&
isTime(right.resultType)
) { (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" }

// both sides are timestamp
else if (isTimestamp(left.resultType) && isTimestamp(right.resultType)) {
Expand Down
Loading