From 3961719f004a6ff33f7a72fa7d45b4a8efacd010 Mon Sep 17 00:00:00 2001 From: Simonas Gelazevicius Date: Fri, 11 Oct 2024 09:49:27 +0300 Subject: [PATCH 1/2] fix: Do not evaluate isClassArityUsageDisabled on every serialize/deserialize function call --- .../api/serializer/CaseClassSerializer.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala index 62b5e7c..b71437f 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala @@ -39,6 +39,14 @@ abstract class CaseClassSerializer[T <: Product]( @transient lazy val log: Logger = LoggerFactory.getLogger(this.getClass) + @transient private val isClassArityUsageDisabled = + sys.env + .get("DISABLE_CASE_CLASS_ARITY_USAGE") + .exists(v => + Try(v.toBoolean) + .getOrElse(false) + ) + override def isImmutableType: Boolean = scalaFieldSerializers.forall(s => Option(s).exists(_.isImmutableType)) @@ -77,13 +85,6 @@ abstract class CaseClassSerializer[T <: Product]( createInstance(fields.toArray) } - private def isClassArityUsageDisabled = - sys.env - .get("DISABLE_CASE_CLASS_ARITY_USAGE") - .exists(v => - Try(v.toBoolean) - .getOrElse(false) - ) def serialize(value: T, target: DataOutputView): Unit = { if (arity > 0 && !isClassArityUsageDisabled) From a8d462dc6e4c141272abb6ced19e21e31bb68791 Mon Sep 17 00:00:00 2001 From: Simonas Gelazevicius Date: Fri, 11 Oct 2024 14:15:56 +0300 Subject: [PATCH 2/2] chore: Move isClassArityUsageDisabled to object --- .../api/serializer/CaseClassSerializer.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala index b71437f..4515d19 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.types.NullFieldException +import org.apache.flinkx.api.serializer.CaseClassSerializer.isClassArityUsageDisabled import org.slf4j.{Logger, LoggerFactory} import scala.util.{Failure, Success, Try} @@ -39,14 +40,6 @@ abstract class CaseClassSerializer[T <: Product]( @transient lazy val log: Logger = LoggerFactory.getLogger(this.getClass) - @transient private val isClassArityUsageDisabled = - sys.env - .get("DISABLE_CASE_CLASS_ARITY_USAGE") - .exists(v => - Try(v.toBoolean) - .getOrElse(false) - ) - override def isImmutableType: Boolean = scalaFieldSerializers.forall(s => Option(s).exists(_.isImmutableType)) @@ -85,7 +78,6 @@ abstract class CaseClassSerializer[T <: Product]( createInstance(fields.toArray) } - def serialize(value: T, target: DataOutputView): Unit = { if (arity > 0 && !isClassArityUsageDisabled) target.writeInt(value.productArity) @@ -125,3 +117,13 @@ abstract class CaseClassSerializer[T <: Product]( createInstance(fields.filter(_ != null)) } } + +object CaseClassSerializer { + private val isClassArityUsageDisabled = + sys.env + .get("DISABLE_CASE_CLASS_ARITY_USAGE") + .exists(v => + Try(v.toBoolean) + .getOrElse(false) + ) +}