Skip to content

Commit

Permalink
support java time types
Browse files Browse the repository at this point in the history
  • Loading branch information
novakov-alexey committed Sep 26, 2023
1 parent 9039cc7 commit 3b95cf2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
32 changes: 17 additions & 15 deletions src/main/scala/org/apache/flinkx/api/serializers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.flinkx.api.serializer.MappedSerializer.TypeMapper
import org.apache.flinkx.api.serializer._
import org.apache.flinkx.api.typeinfo._
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.typeutils.base.array._

Expand All @@ -20,8 +20,7 @@ import java.lang.{Integer => JInteger}
import java.lang.{Character => JCharacter}
import java.math.{BigInteger => JBigInteger}
import java.math.{BigDecimal => JBigDecimal}
import java.time.Instant

import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import scala.collection.mutable
import scala.reflect.{ClassTag, classTag}

Expand Down Expand Up @@ -132,18 +131,21 @@ object serializers extends LowPrioImplicits {
implicit lazy val shortSerializer: TypeSerializer[Short] = shortInfo.createSerializer(config)

// java
implicit lazy val jIntegerInfo: TypeInformation[JInteger] = BasicTypeInfo.INT_TYPE_INFO
implicit lazy val jLongInfo: TypeInformation[JLong] = BasicTypeInfo.LONG_TYPE_INFO
implicit lazy val jFloatInfo: TypeInformation[JFloat] = BasicTypeInfo.FLOAT_TYPE_INFO
implicit lazy val jDoubleInfo: TypeInformation[JDouble] = BasicTypeInfo.DOUBLE_TYPE_INFO
implicit lazy val jBooleanInfo: TypeInformation[JBoolean] = BasicTypeInfo.BOOLEAN_TYPE_INFO
implicit lazy val jByteInfo: TypeInformation[JByte] = BasicTypeInfo.BYTE_TYPE_INFO
implicit lazy val jCharInfo: TypeInformation[JCharacter] = BasicTypeInfo.CHAR_TYPE_INFO
implicit lazy val jShortInfo: TypeInformation[JShort] = BasicTypeInfo.SHORT_TYPE_INFO
implicit lazy val jVoidInfo: TypeInformation[java.lang.Void] = BasicTypeInfo.VOID_TYPE_INFO
implicit lazy val jBigIntInfo: TypeInformation[BigInteger] = BasicTypeInfo.BIG_INT_TYPE_INFO
implicit lazy val jBigDecInfo: TypeInformation[JBigDecimal] = BasicTypeInfo.BIG_DEC_TYPE_INFO
implicit lazy val jInstantInfo: TypeInformation[Instant] = BasicTypeInfo.INSTANT_TYPE_INFO
implicit lazy val jIntegerInfo: TypeInformation[JInteger] = BasicTypeInfo.INT_TYPE_INFO
implicit lazy val jLongInfo: TypeInformation[JLong] = BasicTypeInfo.LONG_TYPE_INFO
implicit lazy val jFloatInfo: TypeInformation[JFloat] = BasicTypeInfo.FLOAT_TYPE_INFO
implicit lazy val jDoubleInfo: TypeInformation[JDouble] = BasicTypeInfo.DOUBLE_TYPE_INFO
implicit lazy val jBooleanInfo: TypeInformation[JBoolean] = BasicTypeInfo.BOOLEAN_TYPE_INFO
implicit lazy val jByteInfo: TypeInformation[JByte] = BasicTypeInfo.BYTE_TYPE_INFO
implicit lazy val jCharInfo: TypeInformation[JCharacter] = BasicTypeInfo.CHAR_TYPE_INFO
implicit lazy val jShortInfo: TypeInformation[JShort] = BasicTypeInfo.SHORT_TYPE_INFO
implicit lazy val jVoidInfo: TypeInformation[java.lang.Void] = BasicTypeInfo.VOID_TYPE_INFO
implicit lazy val jBigIntInfo: TypeInformation[BigInteger] = BasicTypeInfo.BIG_INT_TYPE_INFO
implicit lazy val jBigDecInfo: TypeInformation[JBigDecimal] = BasicTypeInfo.BIG_DEC_TYPE_INFO
implicit lazy val jInstantInfo: TypeInformation[Instant] = BasicTypeInfo.INSTANT_TYPE_INFO
implicit lazy val jLocalDateTypeInfo: TypeInformation[LocalDate] = LocalTimeTypeInfo.LOCAL_DATE
implicit lazy val jLocalDateTimeTypeInfo: TypeInformation[LocalDateTime] = LocalTimeTypeInfo.LOCAL_DATE_TIME
implicit lazy val jLocalTimeTypeInfo: TypeInformation[LocalTime] = LocalTimeTypeInfo.LOCAL_TIME

implicit def listCCInfo[T](implicit lc: ClassTag[T], ls: TypeSerializer[::[T]]): TypeInformation[::[T]] = {
drop(lc)
Expand Down
13 changes: 10 additions & 3 deletions src/test/scala/org/apache/flinkx/api/SerializerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.flinkx.api.SerializerTest.{
Foo,
Foo2,
Generic,
JavaTime,
ListADT,
Nested,
P2,
Expand All @@ -33,7 +34,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.apache.flinkx.api.serializers._

import java.time.Instant
import java.time.{Instant, LocalDate, LocalDateTime}

class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with TestUtils {

Expand All @@ -44,7 +45,12 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test

it should "derive serializer for java classes" in {
val ser = implicitly[TypeInformation[SimpleJava]].createSerializer(null)
all(ser, SimpleJava(1, "foo", Instant.now()))
all(ser, SimpleJava(1, "foo"))
}

it should "derive serializer for java.time classes" in {
val ser = implicitly[TypeInformation[JavaTime]].createSerializer(null)
all(ser, JavaTime(Instant.now(), LocalDate.now(), LocalDateTime.now()))
}

it should "derive nested classes" in {
Expand Down Expand Up @@ -202,7 +208,8 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
object SerializerTest {
case class Simple(a: Int, b: String)
case class SimpleList(a: List[Int])
case class SimpleJava(a: Integer, b: String, c: Instant)
case class SimpleJava(a: Integer, b: String)
case class JavaTime(a: Instant, b: LocalDate, c: LocalDateTime)
case class Nested(a: Simple)

case class SimpleSeq(a: Seq[Simple])
Expand Down

0 comments on commit 3b95cf2

Please sign in to comment.