Skip to content

Commit

Permalink
Parse Event from a ByteBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and pondzix committed Dec 1, 2023
1 parent ee988bc commit 06e0225
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import cats.implicits._
import shapeless._
import shapeless.ops.record._
import shapeless.ops.hlist._
import cats.data.{NonEmptyList, Validated}
import java.nio.ByteBuffer
import scala.collection.mutable.ListBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError}

private[scalasdk] trait Parser[A] extends TSVParser[A] {
Expand All @@ -42,10 +45,37 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] {
decoded.map(decodedValue => generic.from(decodedValue))
}
}

def parseBytes(row: ByteBuffer): DecodeResult[A] = {
val values = Parser.splitBuffer(row)
if (values.length == 1)
Validated.Invalid(NotTSV)
else if (values.length != expectedNumFields)
Validated.Invalid(FieldNumberMismatch(values.length))
else {
val decoded = decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e))
decoded.map(decodedValue => generic.from(decodedValue))
}
}
}

object Parser {

private val tab: Byte = '\t'.toByte

private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = {
var current = row.duplicate
val builder = ListBuffer(current)
(row.position() until row.limit()).foreach { i =>
if (row.get(i) === tab) {
current.limit(i)
current = row.duplicate.position(i + 1)
builder += current
}
}
builder
}

private[scalasdk] sealed trait DeriveParser[A] {

def knownKeys[R <: HList, K <: HList, L <: HList](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import shapeless._
import cats.syntax.validated._
import cats.syntax.either._
import cats.syntax.apply._
import java.nio.ByteBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError

private[scalasdk] trait RowDecoderCompanion {
Expand Down Expand Up @@ -44,6 +45,21 @@ private[scalasdk] trait RowDecoderCompanion {
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

/** Parse TSV row into HList */
private def parseBytes[H: ValueDecoder, T <: HList](
key: Key,
tailDecoder: RowDecoder[T],
maxLength: Option[Int],
row: List[ByteBuffer]
): RowDecodeResult[H :: T] =
row match {
case h :: t =>
val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel
val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t)
(hv, tv).mapN(_ :: _)
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

implicit def hnilFromRow: DeriveRowDecoder[HNil] =
new DeriveRowDecoder[HNil] {
def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[HNil] =
Expand All @@ -55,6 +71,14 @@ private[scalasdk] trait RowDecoderCompanion {
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[HNil] =
row match {
case Nil =>
HNil.validNel
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}
}
}

Expand All @@ -67,6 +91,7 @@ private[scalasdk] trait RowDecoderCompanion {
val maxLength = maxLengths.get(key.name)
new RowDecoder[H :: T] {
def apply(row: List[String]): RowDecodeResult[H :: T] = parse(key, tailDecoder, maxLength, row)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H :: T] = parseBytes(key, tailDecoder, maxLength, row)
}
case Nil =>
// Shapeless type checking makes this impossible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import cats.implicits._
import cats.data.{NonEmptyList, Validated}
import java.nio.ByteBuffer
import scala.collection.mutable.ListBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError}
import scala.deriving._
import scala.compiletime._
Expand All @@ -30,10 +33,32 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] {
else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length))
else decoder(values.toList).leftMap(e => RowDecodingError(e))
}

def parseBytes(row: ByteBuffer): DecodeResult[A] = {
val values = Parser.splitBuffer(row)
if (values.length == 1) Validated.Invalid(NotTSV)
else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length))
else decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e))
}
}

object Parser {

private val tab: Byte = '\t'.toByte

private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = {
var current = row.duplicate
val builder = ListBuffer(current)
(row.position() until row.limit()).foreach { i =>
if (row.get(i) === tab) {
current.limit(i)
current = row.duplicate.position(i + 1)
builder += current
}
}
builder
}

private[scalasdk] sealed trait DeriveParser[A] {
inline def knownKeys(implicit mirror: Mirror.ProductOf[A]): List[String] =
constValueTuple[mirror.MirroredElemLabels].toArray.map(_.toString).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.syntax.validated._
import cats.syntax.either._
import cats.syntax.apply._
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError
import java.nio.ByteBuffer
import scala.deriving._
import scala.compiletime._

Expand Down Expand Up @@ -52,6 +53,20 @@ private[scalasdk] trait RowDecoderCompanion {
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

private def parseBytes[H: ValueDecoder, T <: Tuple](
key: Key,
tailDecoder: RowDecoder[T],
maxLength: Option[Int],
row: List[ByteBuffer]
): RowDecodeResult[H *: T] =
row match {
case h :: t =>
val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel
val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t)
(hv, tv).mapN(_ *: _)
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

implicit def hnilFromRow: DeriveRowDecoder[EmptyTuple] =
new DeriveRowDecoder[EmptyTuple] {
def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[EmptyTuple] =
Expand All @@ -63,6 +78,14 @@ private[scalasdk] trait RowDecoderCompanion {
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[EmptyTuple] =
row match {
case Nil =>
EmptyTuple.validNel
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}
}
}

Expand All @@ -75,6 +98,7 @@ private[scalasdk] trait RowDecoderCompanion {
val maxLength = maxLengths.get(key.name)
new RowDecoder[H *: T] {
def apply(row: List[String]): RowDecodeResult[H *: T] = parse(key, tailDecoder, maxLength, row)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H *: T] = parseBytes(key, tailDecoder, maxLength, row)
}
case Nil =>
// Shapeless type checking makes this impossible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.snowplowanalytics.snowplow.analytics.scalasdk
import java.time.Instant
import java.util.UUID
import java.time.format.DateTimeFormatter
import java.nio.ByteBuffer

// circe
import io.circe.{Decoder, Encoder, Json, JsonObject}
Expand Down Expand Up @@ -280,6 +281,9 @@ object Event {
def parse(line: String): DecodeResult[Event] =
stdParser.parse(line)

def parseBytes(bytes: ByteBuffer): DecodeResult[Event] =
stdParser.parseBytes(bytes)

private lazy val fieldNames: List[String] =
Parser.deriveFor[Event].knownKeys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
package com.snowplowanalytics.snowplow.analytics.scalasdk
package decode

import java.nio.ByteBuffer

private[scalasdk] trait RowDecoder[L] extends Serializable { self =>
def apply(row: List[String]): RowDecodeResult[L]
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[L]
def map[B](f: L => B): RowDecoder[B] =
new RowDecoder[B] {
def apply(row: List[String]): RowDecodeResult[B] = self.apply(row).map(f)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[B] = self.decodeBytes(row).map(f)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import java.nio.ByteBuffer

/** Parser for a TSV-encoded string */
trait TSVParser[A] extends Serializable {
def parseBytes(bytes: ByteBuffer): DecodeResult[A]
def parse(row: String): DecodeResult[A]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.validate.FIELD_SIZES
import java.time.Instant
import java.time.format.DateTimeParseException
import java.util.UUID
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

// cats
import cats.syntax.either._
Expand All @@ -30,8 +32,8 @@ import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

// circe
import io.circe.parser.{parse => parseJson}
import io.circe.{Error, Json}
import io.circe.jawn.JawnParser
import io.circe.{Error, Json, ParsingFailure}

// This library
import com.snowplowanalytics.snowplow.analytics.scalasdk.Common.{ContextsCriterion, UnstructEventCriterion}
Expand All @@ -45,9 +47,19 @@ private[decode] trait ValueDecoder[A] {
value: String,
maxLength: Option[Int]
): DecodedValue[A]

def parseBytes(
key: Key,
value: ByteBuffer,
maxLength: Option[Int]
): DecodedValue[A] =
parse(key, StandardCharsets.UTF_8.decode(value).toString, maxLength)
}

private[decode] object ValueDecoder {

private val parser: JawnParser = new JawnParser

def apply[A](implicit readA: ValueDecoder[A]): ValueDecoder[A] = readA

def fromFunc[A](f: ((Key, String, Option[Int])) => DecodedValue[A]): ValueDecoder[A] =
Expand Down Expand Up @@ -159,41 +171,85 @@ private[decode] object ValueDecoder {
}
}

implicit final val unstructuredJson: ValueDecoder[UnstructEvent] =
fromFunc[UnstructEvent] {
case (key, value, _) =>
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show)
implicit final val unstructuredJson: ValueDecoder[UnstructEvent] = {
def fromJsonParseResult(
result: Either[ParsingFailure, Json],
key: Key,
originalValue: => String
): DecodedValue[UnstructEvent] = {
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, originalValue, error.show)
result
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if UnstructEventCriterion.matches(schema) =>
data.as[SelfDescribingData[Json]].leftMap(asLeft).map(_.some).map(UnstructEvent.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, originalValue, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent]
case Left(error) => error.asLeft[UnstructEvent]
}
}
new ValueDecoder[UnstructEvent] {
def parse(
key: Key,
value: String,
maxLength: Option[Int]
): DecodedValue[UnstructEvent] =
if (value.isEmpty)
UnstructEvent(None).asRight[RowDecodingErrorInfo]
else
parseJson(value)
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if UnstructEventCriterion.matches(schema) =>
data.as[SelfDescribingData[Json]].leftMap(asLeft).map(_.some).map(UnstructEvent.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent]
case Left(error) => error.asLeft[UnstructEvent]
}
fromJsonParseResult(parser.parse(value), key, value)

override def parseBytes(
key: Key,
value: ByteBuffer,
maxLength: Option[Int]
): DecodedValue[UnstructEvent] =
if (!value.hasRemaining())
UnstructEvent(None).asRight[RowDecodingErrorInfo]
else
fromJsonParseResult(parser.parseByteBuffer(value), key, StandardCharsets.UTF_8.decode(value).toString)
}
}

implicit final val contexts: ValueDecoder[Contexts] =
fromFunc[Contexts] {
case (key, value, _) =>
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show)
implicit final val contexts: ValueDecoder[Contexts] = {
def fromJsonParseResult(
result: Either[ParsingFailure, Json],
key: Key,
originalValue: => String
): DecodedValue[Contexts] = {
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, originalValue, error.show)
result
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if ContextsCriterion.matches(schema) =>
data.as[List[SelfDescribingData[Json]]].leftMap(asLeft).map(Contexts.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, originalValue, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts]
case Left(error) => error.asLeft[Contexts]
}
}
new ValueDecoder[Contexts] {
def parse(
key: Key,
value: String,
maxLength: Option[Int]
): DecodedValue[Contexts] =
if (value.isEmpty)
Contexts(List()).asRight[RowDecodingErrorInfo]
Contexts(List.empty).asRight[RowDecodingErrorInfo]
else
parseJson(value)
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if ContextsCriterion.matches(schema) =>
data.as[List[SelfDescribingData[Json]]].leftMap(asLeft).map(Contexts.apply)
case Right(SelfDescribingData(schema, _)) =>
InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts]
case Left(error) => error.asLeft[Contexts]
}
fromJsonParseResult(parser.parse(value), key, value)

override def parseBytes(
key: Key,
value: ByteBuffer,
maxLength: Option[Int]
): DecodedValue[Contexts] =
if (!value.hasRemaining())
Contexts(List.empty).asRight[RowDecodingErrorInfo]
else
fromJsonParseResult(parser.parseByteBuffer(value), key, StandardCharsets.UTF_8.decode(value).toString)
}
}

/**
* Converts a timestamp to an ISO-8601 format usable by Instant.parse()
Expand Down
Loading

0 comments on commit 06e0225

Please sign in to comment.