From 3702d741d2f5669023073001536cf64b5309f4ff Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Sat, 23 Dec 2023 17:55:22 -0500 Subject: [PATCH 1/7] Added code for supporting compression in response body --- .../__files/sample_gzip_csv_response_body.gz | Bin 0 -> 245 bytes .../expected_gzip_csv_response.json | 26 +++++ ...gzip_csv_response_dataframe_converter.json | 10 ++ ...gzip_csv_response_dataframe_converter.conf | 25 ++++ ...eToDataFrameConverterIntegrationTest.scala | 34 ++++++ .../restonomer/app/RestonomerWorkflow.scala | 1 + .../body/RestonomerResponseBody.scala | 5 +- .../common/ResponseBodyCompressionTypes.scala | 23 ++++ .../restonomer/http/RestonomerResponse.scala | 110 +++++++++++------- 9 files changed, 191 insertions(+), 43 deletions(-) create mode 100644 src/it/resources/mock_data/response_body/__files/sample_gzip_csv_response_body.gz create mode 100644 src/it/resources/mock_data/response_body/expected_gzip_csv_response.json create mode 100644 src/it/resources/mock_data/response_body/mappings/mocked_gzip_csv_response_dataframe_converter.json create mode 100644 src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_gzip_csv_response_dataframe_converter.conf create mode 100644 src/it/scala/com/clairvoyant/restonomer/response_body/GzipCSVResponseToDataFrameConverterIntegrationTest.scala create mode 100644 src/main/scala/com/clairvoyant/restonomer/common/ResponseBodyCompressionTypes.scala diff --git a/src/it/resources/mock_data/response_body/__files/sample_gzip_csv_response_body.gz b/src/it/resources/mock_data/response_body/__files/sample_gzip_csv_response_body.gz new file mode 100644 index 0000000000000000000000000000000000000000..ebd7ab85137705f6e50739e311cad18d7a2b7370 GIT binary patch literal 245 zcmVeb1Wpi+EZgXW{VsB)50Huw=N&_(z zMfZaL@F?i&O_<4yP;gbK3s+X#A9zh(+h8UMFNvVP=E6_vFEogPdzY7UIp_TOwUKD6 zDd1gT+_Lv9xDzt3E*#-;jcY0l`Q%b!iyLu<@r4g8LLLsxAdTa5rBGaXZ|IFfWqcqB zO^|V0_pqbm_0=j-yT%Nb!Bl6-Mo26<2J*O?=_3Zk89B3`Yp}7?11_FSiF+0pdj#F< vVS&@2^n^>*_cu8FtWk~F{VCkiobLYCZE&%`lHTZJz literal 0 HcmV?d00001 diff --git a/src/it/resources/mock_data/response_body/expected_gzip_csv_response.json b/src/it/resources/mock_data/response_body/expected_gzip_csv_response.json new file mode 100644 index 00000000..5f86fe14 --- /dev/null +++ b/src/it/resources/mock_data/response_body/expected_gzip_csv_response.json @@ -0,0 +1,26 @@ +[ + { + "Product line": "Aparatos de cuidado personal", + "Seller": "Amazon.es", + "Tracking ID": "eselmundo-21", + "Date shipped": "October 30, 2021", + "Price": 41.24, + "Referral fee rate": "8.00%", + "Quantity": 1, + "Revenue": 41.24, + "Earnings": 3.3, + "Sub Tag": "-" + }, + { + "Product line": "Aparatos & cuidado personal", + "Seller": "Amazon.es", + "Tracking ID": "eselmundo-21", + "Date shipped": "October 30, 2021", + "Price": 82.3, + "Referral fee rate": "8.00%", + "Quantity": 1, + "Revenue": 82.3, + "Earnings": 4.5, + "Sub Tag": "-" + } +] diff --git a/src/it/resources/mock_data/response_body/mappings/mocked_gzip_csv_response_dataframe_converter.json b/src/it/resources/mock_data/response_body/mappings/mocked_gzip_csv_response_dataframe_converter.json new file mode 100644 index 00000000..1d94b7c8 --- /dev/null +++ b/src/it/resources/mock_data/response_body/mappings/mocked_gzip_csv_response_dataframe_converter.json @@ -0,0 +1,10 @@ +{ + "request": { + "method": "GET", + "url": "/gzip-csv-response-converter" + }, + "response": { + "status": 200, + "bodyFileName": "sample_gzip_csv_response_body.gz" + } +} diff --git a/src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_gzip_csv_response_dataframe_converter.conf b/src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_gzip_csv_response_dataframe_converter.conf new file mode 100644 index 00000000..df4f997c --- /dev/null +++ b/src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_gzip_csv_response_dataframe_converter.conf @@ -0,0 +1,25 @@ +name = "checkpoint_gzip_csv_response_dataframe_converter" + +data = { + data-request = { + url = "http://localhost:8080/gzip-csv-response-converter" + } + + data-response = { + body = { + type = "Text" + text-format = { + type = "CSVTextFormat" + } + compression = "GZIP" + } + + persistence = { + type = "LocalFileSystem" + file-format = { + type = "ParquetFileFormat" + } + file-path = "/tmp/response_body" + } + } +} diff --git a/src/it/scala/com/clairvoyant/restonomer/response_body/GzipCSVResponseToDataFrameConverterIntegrationTest.scala b/src/it/scala/com/clairvoyant/restonomer/response_body/GzipCSVResponseToDataFrameConverterIntegrationTest.scala new file mode 100644 index 00000000..626197da --- /dev/null +++ b/src/it/scala/com/clairvoyant/restonomer/response_body/GzipCSVResponseToDataFrameConverterIntegrationTest.scala @@ -0,0 +1,34 @@ +package com.clairvoyant.restonomer.response_body + +import com.clairvoyant.restonomer.common.{IntegrationTestDependencies, MockFileSystemPersistence} +import org.apache.spark.sql.types.* + +class GzipCSVResponseToDataFrameConverterIntegrationTest + extends IntegrationTestDependencies + with MockFileSystemPersistence { + + override val mappingsDirectory: String = "response_body" + + override val expectedDFSchema: Option[StructType] = Some( + StructType( + List( + StructField(name = "Product line", dataType = StringType), + StructField(name = "Seller", dataType = StringType), + StructField(name = "Tracking ID", dataType = StringType), + StructField(name = "Date shipped", dataType = StringType), + StructField(name = "Price", dataType = DoubleType), + StructField(name = "Referral fee rate", dataType = StringType), + StructField(name = "Quantity", dataType = IntegerType), + StructField(name = "Revenue", dataType = DoubleType), + StructField(name = "Earnings", dataType = DoubleType), + StructField(name = "Sub Tag", dataType = StringType) + ) + ) + ) + + it should "convert the gzip CSV file response body into a dataframe" in { + runCheckpoint(checkpointFileName = "checkpoint_gzip_csv_response_dataframe_converter.conf") + outputDF should matchExpectedDataFrame("expected_gzip_csv_response.json") + } + +} diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala index a503ad28..217d8ff1 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala @@ -58,6 +58,7 @@ object RestonomerWorkflow { val dataRestonomerResponse = RestonomerResponse.fetchFromRequest( restonomerRequest = dataRestonomerRequest, + compression = checkpointConfig.data.dataResponse.body.compression, retryConfig = checkpointConfig.data.dataRequest.retry, restonomerPagination = checkpointConfig.data.dataResponse.pagination ) diff --git a/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala b/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala index 53fe2cbb..6cc4f27c 100644 --- a/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala +++ b/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala @@ -8,10 +8,13 @@ import zio.config.derivation.nameWithLabel @nameWithLabel sealed trait RestonomerResponseBody: + val compression: Option[String] + def read(restonomerResponseBody: Seq[String])(using sparkSession: SparkSession): DataFrame case class Text( - textFormat: TextFormat + textFormat: TextFormat, + override val compression: Option[String] = None ) extends RestonomerResponseBody: override def read(restonomerResponseBody: Seq[String])(using sparkSession: SparkSession): DataFrame = diff --git a/src/main/scala/com/clairvoyant/restonomer/common/ResponseBodyCompressionTypes.scala b/src/main/scala/com/clairvoyant/restonomer/common/ResponseBodyCompressionTypes.scala new file mode 100644 index 00000000..29fece15 --- /dev/null +++ b/src/main/scala/com/clairvoyant/restonomer/common/ResponseBodyCompressionTypes.scala @@ -0,0 +1,23 @@ +package com.clairvoyant.restonomer.common + +import cats.syntax.eq.* +import com.clairvoyant.restonomer.exception.RestonomerException +import sttp.client3.* + +enum ResponseBodyCompressionTypes: + case GZIP + +object ResponseBodyCompressionTypes { + + def apply(responseBodyCompressionType: String): ResponseBodyCompressionTypes = + if (isValidResponseBodyCompressionType(responseBodyCompressionType)) + valueOf(responseBodyCompressionType) + else + throw new RestonomerException( + s"The response body compression type: $responseBodyCompressionType is not supported." + ) + + private def isValidResponseBodyCompressionType(responseBodyCompressionType: String): Boolean = + values.exists(_.toString === responseBodyCompressionType) + +} diff --git a/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala b/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala index f766f8ba..ee2c02c8 100644 --- a/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala +++ b/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala @@ -1,6 +1,8 @@ package com.clairvoyant.restonomer.http import cats.syntax.eq.* +import com.clairvoyant.restonomer.common.* +import com.clairvoyant.restonomer.common.ResponseBodyCompressionTypes.* import com.clairvoyant.restonomer.exception.RestonomerException import com.clairvoyant.restonomer.model.RetryConfig import com.clairvoyant.restonomer.pagination.RestonomerPagination @@ -10,6 +12,8 @@ import sttp.client3.* import sttp.model.HeaderNames.Location import sttp.model.{Header, StatusCode} +import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader} +import java.util.zip.GZIPInputStream import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration.DurationInt @@ -23,6 +27,7 @@ object RestonomerResponse { def fetchFromRequest( restonomerRequest: RestonomerRequest, + compression: Option[String], retryConfig: RetryConfig, restonomerPagination: Option[RestonomerPagination] ): RestonomerResponse = { @@ -48,6 +53,7 @@ object RestonomerResponse { ) ) ), + compression = compression, statusCodesToRetry = retryConfig.statusCodesToRetry.map(StatusCode(_)), maxRetries = retryConfig.maxRetries ).map(httpResponseBodySeq ++ _) @@ -64,6 +70,7 @@ object RestonomerResponse { restonomerRequest = restonomerRequest, httpResponseBody = getBody( restonomerRequest = restonomerRequest, + compression = compression, statusCodesToRetry = retryConfig.statusCodesToRetry.map(StatusCode(_)), maxRetries = retryConfig.maxRetries ) @@ -75,60 +82,79 @@ object RestonomerResponse { private def getBody( restonomerRequest: RestonomerRequest, + compression: Option[String], statusCodesToRetry: List[StatusCode], maxRetries: Int, currentRetryAttemptNumber: Int = 0 ): Future[Seq[String]] = { - restonomerRequest.httpRequest - .send(sttpBackend) - .flatMap { - case Response(body, StatusCode.Ok, _, _, _, _) => Future(body.toSeq) - - case response @ Response(_, statusCode, _, headers, _, _) - if statusCodesToRetry.contains(statusCode) && currentRetryAttemptNumber < maxRetries => - waitBeforeRetry( - whatToRetry = getBody( + def getResponseBody[T](req: Request[Either[String, T], Any]): Future[Seq[String]] = { + req + .send(sttpBackend) + .flatMap { + case Response(body, StatusCode.Ok, _, _, _, _) => + body match { + case Right(b: String) => Future(Seq(b)) + case Right(b: Array[Byte]) => + val gzipStream = new GZIPInputStream(new ByteArrayInputStream(b)) + val bufferedReader = new BufferedReader(new InputStreamReader(gzipStream)) + Future(Iterator.continually(bufferedReader.readLine()).takeWhile(_ != null).toSeq) + case _ => Future(Seq.empty) + } + + case response @ Response(_, statusCode, _, headers, _, _) + if statusCodesToRetry.contains(statusCode) && currentRetryAttemptNumber < maxRetries => + waitBeforeRetry( + whatToRetry = getBody( + restonomerRequest = restonomerRequest + .copy( + httpRequest = restonomerRequest.httpRequest + .header( + k = "retry-attempt", + v = (currentRetryAttemptNumber + 1).toString, + replaceExisting = true + ) + ), + compression = compression, + statusCodesToRetry = statusCodesToRetry, + maxRetries = maxRetries, + currentRetryAttemptNumber = currentRetryAttemptNumber + 1 + ), + message = response.toString(), + maxRetries = maxRetries, + currentRetryAttemptNumber = currentRetryAttemptNumber, + headers = headers + ) + + case Response(_, StatusCode.Found, _, headers, _, requestMetadata) => + getBody( restonomerRequest = restonomerRequest - .copy( - httpRequest = restonomerRequest.httpRequest - .header( - k = "retry-attempt", - v = (currentRetryAttemptNumber + 1).toString, - replaceExisting = true - ) + .copy(httpRequest = + restonomerRequest.httpRequest.method( + method = requestMetadata.method, + uri = uri"${headers.find(_.name == Location).get}" + ) ), + compression = compression, statusCodesToRetry = statusCodesToRetry, maxRetries = maxRetries, currentRetryAttemptNumber = currentRetryAttemptNumber + 1 - ), - message = response.toString(), - maxRetries = maxRetries, - currentRetryAttemptNumber = currentRetryAttemptNumber, - headers = headers - ) - - case Response(_, StatusCode.Found, _, headers, _, requestMetadata) => - getBody( - restonomerRequest = restonomerRequest - .copy(httpRequest = - restonomerRequest.httpRequest.method( - method = requestMetadata.method, - uri = uri"${headers.find(_.name == Location).get}" - ) - ), - statusCodesToRetry = statusCodesToRetry, - maxRetries = maxRetries, - currentRetryAttemptNumber = currentRetryAttemptNumber + 1 - ) + ) - case Response(_, StatusCode.NoContent, _, _, _, _) => throw new RestonomerException("No Content.") + case Response(_, StatusCode.NoContent, _, _, _, _) => throw new RestonomerException("No Content.") - case _ => - throw new RestonomerException( - s"Something totally unexpected bad happened while calling the API ${currentRetryAttemptNumber + 1} times." - ) - } + case _ => + throw new RestonomerException( + s"Something totally unexpected bad happened while calling the API ${currentRetryAttemptNumber + 1} times." + ) + } + } + compression + .map { + ResponseBodyCompressionTypes(_) match + case GZIP => getResponseBody(restonomerRequest.httpRequest.response(asByteArray)) + } + .getOrElse(getResponseBody(restonomerRequest.httpRequest.response(asString))) } private def waitBeforeRetry[T]( From cbe7b6b5d8ec35da7b6f860339c9b8c61c2b78d9 Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Sat, 23 Dec 2023 18:39:30 -0500 Subject: [PATCH 2/7] Added code for supporting compression in response body --- .../restonomer/app/RestonomerWorkflow.scala | 2 +- .../restonomer/http/RestonomerResponse.scala | 145 ++++++++---------- 2 files changed, 64 insertions(+), 83 deletions(-) diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala index 217d8ff1..d63c3cdb 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala @@ -57,7 +57,7 @@ object RestonomerWorkflow { .build val dataRestonomerResponse = RestonomerResponse.fetchFromRequest( - restonomerRequest = dataRestonomerRequest, + httpRequest = dataRestonomerRequest.httpRequest, compression = checkpointConfig.data.dataResponse.body.compression, retryConfig = checkpointConfig.data.dataRequest.retry, restonomerPagination = checkpointConfig.data.dataResponse.pagination diff --git a/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala b/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala index ee2c02c8..c3affd70 100644 --- a/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala +++ b/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala @@ -26,15 +26,12 @@ object RestonomerResponse { private val random: Random.type = scala.util.Random def fetchFromRequest( - restonomerRequest: RestonomerRequest, + httpRequest: Request[Either[String, String], Any], compression: Option[String], retryConfig: RetryConfig, restonomerPagination: Option[RestonomerPagination] ): RestonomerResponse = { - def getPages( - restonomerRequest: RestonomerRequest, - httpResponseBody: Future[Seq[String]] - ): Future[Seq[String]] = { + def getPages(httpResponseBody: Future[Seq[String]]): Future[Seq[String]] = { restonomerPagination .map { pagination => httpResponseBody.flatMap { httpResponseBodySeq => @@ -42,18 +39,14 @@ object RestonomerResponse { .getNextPageToken(httpResponseBodySeq.last) .map { nextPageToken => getPages( - restonomerRequest = restonomerRequest, httpResponseBody = getBody( - restonomerRequest = restonomerRequest.copy(httpRequest = - restonomerRequest.httpRequest.method( - method = restonomerRequest.httpRequest.method, - uri = pagination.placeNextTokenInURL( - uri = restonomerRequest.httpRequest.uri, - nextPageToken = nextPageToken - ) + httpRequest = httpRequest.method( + method = httpRequest.method, + uri = pagination.placeNextTokenInURL( + uri = httpRequest.uri, + nextPageToken = nextPageToken ) ), - compression = compression, statusCodesToRetry = retryConfig.statusCodesToRetry.map(StatusCode(_)), maxRetries = retryConfig.maxRetries ).map(httpResponseBodySeq ++ _) @@ -67,10 +60,13 @@ object RestonomerResponse { RestonomerResponse { getPages( - restonomerRequest = restonomerRequest, httpResponseBody = getBody( - restonomerRequest = restonomerRequest, - compression = compression, + httpRequest = compression + .map { + ResponseBodyCompressionTypes(_) match + case GZIP => httpRequest.response(asByteArray) + } + .getOrElse(httpRequest.response(asString)), statusCodesToRetry = retryConfig.statusCodesToRetry.map(StatusCode(_)), maxRetries = retryConfig.maxRetries ) @@ -80,81 +76,66 @@ object RestonomerResponse { private def sleepTimeInSeconds: Int = 10 + random.nextInt(10) + 1 - private def getBody( - restonomerRequest: RestonomerRequest, - compression: Option[String], + private def getBody[T]( + httpRequest: Request[Either[String, T], Any], statusCodesToRetry: List[StatusCode], maxRetries: Int, currentRetryAttemptNumber: Int = 0 ): Future[Seq[String]] = { - def getResponseBody[T](req: Request[Either[String, T], Any]): Future[Seq[String]] = { - req - .send(sttpBackend) - .flatMap { - case Response(body, StatusCode.Ok, _, _, _, _) => - body match { - case Right(b: String) => Future(Seq(b)) - case Right(b: Array[Byte]) => - val gzipStream = new GZIPInputStream(new ByteArrayInputStream(b)) - val bufferedReader = new BufferedReader(new InputStreamReader(gzipStream)) - Future(Iterator.continually(bufferedReader.readLine()).takeWhile(_ != null).toSeq) - case _ => Future(Seq.empty) - } + httpRequest + .send(sttpBackend) + .flatMap { + case Response(body, StatusCode.Ok, _, _, _, _) => + body match { + case Right(responseBody: String) => Future(Seq(responseBody)) + + case Right(responseBody: Array[Byte]) => + val gzipStream = new GZIPInputStream(new ByteArrayInputStream(responseBody)) + val inputStreamReader = new InputStreamReader(gzipStream) + val bufferedReader = new BufferedReader(inputStreamReader) + Future(Iterator.continually(bufferedReader.readLine()).takeWhile(_ != null).toSeq) + + case _ => Future(Seq.empty) + } - case response @ Response(_, statusCode, _, headers, _, _) - if statusCodesToRetry.contains(statusCode) && currentRetryAttemptNumber < maxRetries => - waitBeforeRetry( - whatToRetry = getBody( - restonomerRequest = restonomerRequest - .copy( - httpRequest = restonomerRequest.httpRequest - .header( - k = "retry-attempt", - v = (currentRetryAttemptNumber + 1).toString, - replaceExisting = true - ) - ), - compression = compression, - statusCodesToRetry = statusCodesToRetry, - maxRetries = maxRetries, - currentRetryAttemptNumber = currentRetryAttemptNumber + 1 - ), - message = response.toString(), - maxRetries = maxRetries, - currentRetryAttemptNumber = currentRetryAttemptNumber, - headers = headers - ) - - case Response(_, StatusCode.Found, _, headers, _, requestMetadata) => - getBody( - restonomerRequest = restonomerRequest - .copy(httpRequest = - restonomerRequest.httpRequest.method( - method = requestMetadata.method, - uri = uri"${headers.find(_.name == Location).get}" - ) + case response @ Response(_, statusCode, _, headers, _, _) + if statusCodesToRetry.contains(statusCode) && currentRetryAttemptNumber < maxRetries => + waitBeforeRetry( + whatToRetry = getBody( + httpRequest = httpRequest + .header( + k = "retry-attempt", + v = (currentRetryAttemptNumber + 1).toString, + replaceExisting = true ), - compression = compression, statusCodesToRetry = statusCodesToRetry, maxRetries = maxRetries, currentRetryAttemptNumber = currentRetryAttemptNumber + 1 - ) - - case Response(_, StatusCode.NoContent, _, _, _, _) => throw new RestonomerException("No Content.") - - case _ => - throw new RestonomerException( - s"Something totally unexpected bad happened while calling the API ${currentRetryAttemptNumber + 1} times." - ) - } - } - - compression - .map { - ResponseBodyCompressionTypes(_) match - case GZIP => getResponseBody(restonomerRequest.httpRequest.response(asByteArray)) + ), + message = response.toString(), + maxRetries = maxRetries, + currentRetryAttemptNumber = currentRetryAttemptNumber, + headers = headers + ) + + case Response(_, StatusCode.Found, _, headers, _, requestMetadata) => + getBody( + httpRequest = httpRequest.method( + method = requestMetadata.method, + uri = uri"${headers.find(_.name == Location).get}" + ), + statusCodesToRetry = statusCodesToRetry, + maxRetries = maxRetries, + currentRetryAttemptNumber = currentRetryAttemptNumber + 1 + ) + + case Response(_, StatusCode.NoContent, _, _, _, _) => throw new RestonomerException("No Content.") + + case _ => + throw new RestonomerException( + s"Something totally unexpected bad happened while calling the API ${currentRetryAttemptNumber + 1} times." + ) } - .getOrElse(getResponseBody(restonomerRequest.httpRequest.response(asString))) } private def waitBeforeRetry[T]( From 5f0575bf6a19a6cf786f13a09611499490371278 Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Sat, 23 Dec 2023 18:40:47 -0500 Subject: [PATCH 3/7] Scalafmt fix --- .../clairvoyant/restonomer/body/RestonomerResponseBody.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala b/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala index 6cc4f27c..cc3faf16 100644 --- a/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala +++ b/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala @@ -9,7 +9,7 @@ import zio.config.derivation.nameWithLabel @nameWithLabel sealed trait RestonomerResponseBody: val compression: Option[String] - + def read(restonomerResponseBody: Seq[String])(using sparkSession: SparkSession): DataFrame case class Text( From 26f378de9c97d3be00cde46165378c127b8903ef Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Sat, 23 Dec 2023 18:46:38 -0500 Subject: [PATCH 4/7] Scalafix fix --- .../com/clairvoyant/restonomer/http/RestonomerResponse.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala b/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala index c3affd70..050d170f 100644 --- a/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala +++ b/src/main/scala/com/clairvoyant/restonomer/http/RestonomerResponse.scala @@ -1,8 +1,8 @@ package com.clairvoyant.restonomer.http import cats.syntax.eq.* -import com.clairvoyant.restonomer.common.* import com.clairvoyant.restonomer.common.ResponseBodyCompressionTypes.* +import com.clairvoyant.restonomer.common.* import com.clairvoyant.restonomer.exception.RestonomerException import com.clairvoyant.restonomer.model.RetryConfig import com.clairvoyant.restonomer.pagination.RestonomerPagination From 97b9816d607746dc7b0954d3cebe002e72eccbaa Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Wed, 27 Dec 2023 12:37:51 -0500 Subject: [PATCH 5/7] fixed config file --- .../checkpoint_json_response_dataframe_converter.conf | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_json_response_dataframe_converter.conf b/src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_json_response_dataframe_converter.conf index 738b7c41..dec35273 100644 --- a/src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_json_response_dataframe_converter.conf +++ b/src/it/resources/restonomer_context/checkpoints/response_body/checkpoint_json_response_dataframe_converter.conf @@ -6,11 +6,6 @@ data = { } data-response = { - body = { - type = "JSON" - primitives-as-string = true - } - body = { type = "Text" text-format = { From 65ae7b5fcd690605b5339496f415779e24a18dce Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Thu, 4 Jan 2024 13:00:59 -0500 Subject: [PATCH 6/7] Few improvements --- site/docs/config_classes/checkpoint_config.md | 12 ++++++------ site/docs/config_classes/data_response_config.md | 2 +- site/docs/persistence/redshift_persistence.md | 3 ++- site/docs/response_body/text/csv_text.md | 2 +- .../restonomer/body/RestonomerResponseBody.scala | 4 ++-- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/site/docs/config_classes/checkpoint_config.md b/site/docs/config_classes/checkpoint_config.md index a84fbaeb..b0cf5746 100644 --- a/site/docs/config_classes/checkpoint_config.md +++ b/site/docs/config_classes/checkpoint_config.md @@ -6,12 +6,12 @@ trigger a checkpoint. The checkpoint configuration contains below config options to be provided by the user: -| Config Name | Mandatory | Default Value | Description | -|:-------------|:---------:|:-------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| name | Yes | - | Unique name for your checkpoint | -| token | No | - | Token request configuration represented by `TokenConfig` class | -| data | Yes | - | Main data request configuration represented by `DataConfig` class | -| sparkConfigs | No | - | Map of spark configurations specific to the checkpoint.
If the same config is also present in `application.conf` file, then checkpoint specific config gets the priority. | +| Config Name | Mandatory | Default Value | Description | +|:-------------|:---------:|:-------------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| name | Yes | - | Unique name for your checkpoint | +| token | No | - | Token request configuration represented by `TokenConfig` class | +| data | Yes | - | Main data request configuration represented by `DataConfig` class | +| sparkConfigs | No | - | Map of spark configurations specific to the checkpoint.
If the same config is also present in `application.conf` file, then checkpoint specific config gets the priority. | User can provide checkpoint configuration file in HOCON format in the below format: diff --git a/site/docs/config_classes/data_response_config.md b/site/docs/config_classes/data_response_config.md index 8edd8cc1..73387a29 100644 --- a/site/docs/config_classes/data_response_config.md +++ b/site/docs/config_classes/data_response_config.md @@ -6,7 +6,7 @@ User need to provide below configs for Data Response Configuration: | Config Name | Mandatory | Default Value | Description | |:----------------|:---------:|:-------------:|:----------------------------------------------------------------------------------------------------| -| body | Yes | - | The body config represented by `DataResponseBodyConfig` | +| body | Yes | - | The body config represented by `RestonomerResponseBody`. | | transformations | No | - | List of transformations to be applied on the restonomer response dataframe | | persistence | Yes | - | The persistence attribute that tells where to persist the transformed restonomer response dataframe | diff --git a/site/docs/persistence/redshift_persistence.md b/site/docs/persistence/redshift_persistence.md index ce668001..bc28ea89 100644 --- a/site/docs/persistence/redshift_persistence.md +++ b/site/docs/persistence/redshift_persistence.md @@ -116,7 +116,8 @@ User can pass below options to the `RedshiftWriterOptions` instance: No - -

A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools. See also the description metadata to set descriptions on individual columns. +

A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools. See also the description metadata to set descriptions on individual columns.

+ pre-actions diff --git a/site/docs/response_body/text/csv_text.md b/site/docs/response_body/text/csv_text.md index b0838dd7..e552e1d0 100644 --- a/site/docs/response_body/text/csv_text.md +++ b/site/docs/response_body/text/csv_text.md @@ -33,7 +33,7 @@ data = { Just like `sep`, user can configure below other properties for CSV text format that will help restonomer for parsing: | Parameter Name | Default Value | Description | -| :-------------------------------- | :-------------------------: || +|:----------------------------------|:---------------------------:|| | char-to-escape-quote-escaping | \ | Sets a single character used for escaping the escape for the quote character. | | column-name-of-corrupt-record | _corrupt_record | Allows renaming the new field having malformed string created by PERMISSIVE mode.
This overrides `spark.sql.columnNameOfCorruptRecord`. | | comment | # | Sets a single character used for skipping lines beginning with this character. | diff --git a/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala b/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala index cc3faf16..b76464f9 100644 --- a/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala +++ b/src/main/scala/com/clairvoyant/restonomer/body/RestonomerResponseBody.scala @@ -46,11 +46,11 @@ case class Text( adaptSchemaColumns = identity ) - case htmlTableTextFomat: HTMLTableTextFormat => + case htmlTableTextFormat: HTMLTableTextFormat => TextToDataFrameReader[HTMLTableTextFormat] .read( text = restonomerResponseBody, - textFormat = htmlTableTextFomat, + textFormat = htmlTableTextFormat, originalSchema = None, adaptSchemaColumns = identity ) From 967c9051dffe9a831167c0e0c175092bd664722d Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Fri, 5 Jan 2024 11:58:57 -0500 Subject: [PATCH 7/7] Added documentation for compression --- site/docs/response_body/text/csv_text.md | 37 +++++++++++++++++++ site/docs/response_body/text/html_text.md | 41 ++++++++++++++++++++- site/docs/response_body/text/json_text.md | 45 +++++++++++++++++++++-- site/docs/response_body/text/xml_text.md | 42 ++++++++++++++++++++- 4 files changed, 158 insertions(+), 7 deletions(-) diff --git a/site/docs/response_body/text/csv_text.md b/site/docs/response_body/text/csv_text.md index e552e1d0..a2b2ad0d 100644 --- a/site/docs/response_body/text/csv_text.md +++ b/site/docs/response_body/text/csv_text.md @@ -30,6 +30,43 @@ data = { } ``` +## Compression + +In case the csv text that is returned by the api is compressed, user can configure the checkpoint in below format: + +```hocon +name = "checkpoint_csv_response_dataframe_converter" + +data = { + data-request = { + url = "http://localhost:8080/csv-response-converter" + } + + data-response = { + body = { + type = "Text" + compression = "GZIP" + text-format = { + type = "CSVTextFormat" + sep = ";" + } + } + + persistence = { + type = "LocalFileSystem" + file-format = { + type = "ParquetFileFormat" + } + file-path = "/tmp/response_body" + } + } +} +``` + +As of now, restonomer supports only `GZIP` compression format. + +## CSV Text Format Configurations + Just like `sep`, user can configure below other properties for CSV text format that will help restonomer for parsing: | Parameter Name | Default Value | Description | diff --git a/site/docs/response_body/text/html_text.md b/site/docs/response_body/text/html_text.md index 2e322839..287960d6 100644 --- a/site/docs/response_body/text/html_text.md +++ b/site/docs/response_body/text/html_text.md @@ -1,6 +1,7 @@ # HTML Table -Restonomer can parse the api response of text type in HTML table format. User need to configure the checkpoint in below format: +Restonomer can parse the api response of text type in HTML table format. User need to configure the checkpoint in below +format: ```hocon name = "checkpoint_html_response_dataframe_converter" @@ -29,8 +30,44 @@ data = { } ``` +## Compression + +In case the html text that is returned by the api is compressed, user can configure the checkpoint in below format: + +```hocon +name = "checkpoint_html_response_dataframe_converter" + +data = { + data-request = { + url = "http://localhost:8080/html-response-converter" + } + + data-response = { + body = { + type = "Text" + compression = "GZIP" + text-format = { + type = "HTMLTableTextFormat" + } + } + + persistence = { + type = "LocalFileSystem" + file-format = { + type = "ParquetFileFormat" + } + file-path = "/tmp/response_body" + } + } +} +``` + +As of now, restonomer supports only `GZIP` compression format. + +## HTML Table Text Format Configurations + User can configure below other properties for HTML text format that will help restonomer for parsing: | Parameter Name | Default Value | Mandatory | Description | -| :------------- | :-----------: | :-------: | :---------------------------------------------------------------------------- | +|:---------------|:-------------:|:---------:|:------------------------------------------------------------------------------| | tableName | None | No | The name of the table in the `table` tag that you want to read the data from. | diff --git a/site/docs/response_body/text/json_text.md b/site/docs/response_body/text/json_text.md index fb73697d..ff381371 100644 --- a/site/docs/response_body/text/json_text.md +++ b/site/docs/response_body/text/json_text.md @@ -1,6 +1,7 @@ # JSON -Restonomer can parse the api response of text type in JSON format. User need to configure the checkpoint in below format: +Restonomer can parse the api response of text type in JSON format. User need to configure the checkpoint in below +format: ```hocon name = "checkpoint_json_response_dataframe_converter" @@ -30,10 +31,48 @@ data = { } ``` -Just like `primitives-as-string`, user can configure below other properties for JSON text format that will help restonomer for parsing: +## Compression + +In case the json text that is returned by the api is compressed, user can configure the checkpoint in below format: + +```hocon +name = "checkpoint_json_response_dataframe_converter" + +data = { + data-request = { + url = "http://localhost:8080/json-response-converter" + } + + data-response = { + body = { + type = "Text" + compression = "GZIP" + text-format = { + type = "JSONTextFormat" + primitives-as-string = true + } + } + + persistence = { + type = "LocalFileSystem" + file-format = { + type = "ParquetFileFormat" + } + file-path = "/tmp/response_body" + } + } +} +``` + +As of now, restonomer supports only `GZIP` compression format. + +## JSON Text Format Configurations + +Just like `primitives-as-string`, user can configure below other properties for JSON text format that will help +restonomer for parsing: | Parameter Name | Default Value | Description | -| :------------------------------------- | :-------------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:---------------------------------------|:---------------------------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------| | allow-backslash-escaping-any-character | false | Allows accepting quoting of all character using backslash quoting mechanism. | | allow-comments | false | Ignores Java/C++ style comment in JSON records. | | allow-non-numeric-numbers | true | Allows JSON parser to recognize set of “Not-a-Number” (NaN) tokens as legal floating number values. | diff --git a/site/docs/response_body/text/xml_text.md b/site/docs/response_body/text/xml_text.md index b03d0dcc..1598d1f1 100644 --- a/site/docs/response_body/text/xml_text.md +++ b/site/docs/response_body/text/xml_text.md @@ -30,10 +30,48 @@ data = { } ``` -Just like `row-tag`, user can configure below other properties for XML text format that will help restonomer for parsing: +## Compression + +In case the xml text that is returned by the api is compressed, user can configure the checkpoint in below format: + +```hocon +name = "checkpoint_xml_response_dataframe_converter" + +data = { + data-request = { + url = "http://localhost:8080/xml-response-converter" + } + + data-response = { + body = { + type = "Text" + compression = "GZIP" + text-format = { + type = "XMLTextFormat" + row-tag = "ROW" + } + } + + persistence = { + type = "LocalFileSystem" + file-format = { + type = "ParquetFileFormat" + } + file-path = "/tmp/response_body" + } + } +} +``` + +As of now, restonomer supports only `GZIP` compression format. + +## XML Text Format Configurations + +Just like `row-tag`, user can configure below other properties for XML text format that will help restonomer for +parsing: | Parameter Name | Default Value | Description | -| :---------------------------- | :-----------------: | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:------------------------------|:-------------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | attribute-prefix | _ | The prefix for attributes so that we can differentiate attributes and elements. | | charset | UTF-8 | Defaults to 'UTF-8' but can be set to other valid charset names. | | column-name-of-corrupt-record | _corrupt_record | Allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord. |