Skip to content

Commit

Permalink
Add v3.0-1.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
alisakotliarova authored Sep 15, 2021
1 parent 8fc04bd commit 22862bd
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 14 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
}

group = 'org.alvearie.hri.flink'
version = '2.1-1.0.1'
version = '3.0-1.0.2'
description = """HRI Flink Pipeline Core Library"""

ext {
Expand Down Expand Up @@ -92,7 +92,7 @@ dependencies {
// Dependencies that library users should include in their job
// shadow jar
// --------------------------------------------------------------
implementation "org.alvearie.hri:hri-api-batch-notification:2.1-2.0.1"
implementation "org.alvearie.hri:hri-api-batch-notification:3.0-2.0.1"
implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "com.fasterxml.jackson.module:jackson-module-scala_${scalaBinaryVersion}:${jacksonVersion}"
Expand Down
49 changes: 46 additions & 3 deletions src/main/scala/org/alvearie/hri/api/MgmtClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.alvearie.hri.flink.core.serialization.NotificationDeserializer
import org.apache.http.ssl.SSLContexts

import java.util.Base64
import java.util.ArrayList
import org.slf4j.LoggerFactory

import java.io.{File, FileNotFoundException}
import scala.util.{Failure, Success, Try}

class MgmtClient(val baseUri: String, val clientId: String, val clientSecret: String, val audience: String, val oauthServiceBaseUrl: String,
val httpClient: CloseableHttpClient = HttpClients.createDefault()) extends Serializable with BatchLookup {
val httpClient: CloseableHttpClient = MgmtClient.createHttpClient()) extends Serializable with BatchLookup {
private val log = LoggerFactory.getLogger(this.getClass)
log.info("Creating HRI MgmtClient for {}", baseUri)

Expand Down Expand Up @@ -127,11 +130,11 @@ class MgmtClient(val baseUri: String, val clientId: String, val clientSecret: St
response = httpClient.execute(request)
response.getStatusLine.getStatusCode match {
case HttpStatus.SC_OK =>
log.info("MgmtApi action call successful")
log.info("HRI MgmtApi action call successful")
Success(entityMapper(response.getEntity))
case status =>
val msg = status.intValue + ": " + EntityUtils.toString(response.getEntity)
log.info("MgmtApi action call failed: {}", msg)
log.info("HRI MgmtApi action call failed: {}", msg)
Failure(new RequestException(msg, status))
}
} catch {
Expand Down Expand Up @@ -159,4 +162,44 @@ object MgmtClient {
val hriConsumerScope = "hri_consumer"
val accessTokenField = "access_token"
val audienceField = "audience"

val trustStoreEnv = "HRI_TRUSTSTORE"
val trustStorePasswordEnv = "HRI_TRUSTSTORE_PASSWORD"

/**
* If 'HRI_TRUSTSTORE' and 'HRI_TRUSTSTORE_PASSWORD' are set, constructs an Http client using the specified trust
* store. If not set, creates a default Http client.
* If unable to load the trust store or create the client, an Exception is thrown.
* @return a Http client
*/
def createHttpClient() : CloseableHttpClient = {
val log = LoggerFactory.getLogger(this.getClass)
val trustStorePath = System.getenv(trustStoreEnv)
val password = System.getenv(trustStorePasswordEnv)

if( trustStorePath == null || trustStorePath.isEmpty ) {
log.info("HRI_TRUSTSTORE is not set, so creating default Http client")
return HttpClients.createDefault()
} else if ( password == null || password.isEmpty ) {
val msg = trustStoreEnv + " is set, but " + trustStorePasswordEnv + " is not. Both must be empty or set."
log.error(msg)
throw new IllegalArgumentException(msg)
}
log.info("Creating Http client with trust store {}", trustStorePath)

val trustStoreFile = new File(trustStorePath);
if (!trustStoreFile.exists() || !trustStoreFile.isFile) {
val msg = "Not found or not a file: " + trustStoreFile.getPath
log.error(msg);
throw new FileNotFoundException(msg);
}

val sslContext = SSLContexts.custom
.loadTrustMaterial(trustStoreFile, password.toCharArray)
.build

return HttpClients.custom
.setSSLContext(sslContext)
.build
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class BaseValidationJob(
// used for functional testing of the Validation Jobs without the HRI Management API.
def getRecordCountSink(props: Properties): SinkFunction[NotificationRecord] = {
if (useMgmtApi) {
log.info("Creating MgmtApiSink({}) for Tracker output", mgmtApiUrl)
log.info("Creating HRI MgmtApiSink({}) for Tracker output", mgmtApiUrl)
return new MgmtApiSink(tenantId, mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)
} else {
log.info("Creating KafkaProducer({}) for Tracker output", notificationTopic)
Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/org/alvearie/hri/flink/core/MgmtApiSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ class MgmtApiSink(val tenantId: String, val mgmtApiUrl: String, val mgmtClientId
@transient lazy val mgmtClient: MgmtClient = createMgmtClient()

// This enables overriding for testing
def createMgmtClient(): MgmtClient = new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)
def createMgmtClient(): MgmtClient = {
try {
new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)
} catch {
case ex: Throwable =>
throw new FlinkException(ex) // can't recover from this
}
}

override def invoke(record: NotificationRecord, context: Context[_]): Unit = {
val batch = record.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ValidationProcessFunction(
def createMgmtClient(): BatchLookup = new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)

/**
* constructor for testing purposes without a MgmtApiClient
* constructor for testing purposes without a HRI MgmtApiClient
*/
def this(notificationDescriptor: MapStateDescriptor[String, BatchNotification],
invalidOutputTag: OutputTag[InvalidRecord],
Expand Down Expand Up @@ -204,7 +204,7 @@ class ValidationProcessFunction(
}
}
case Failure(e) =>
log.error("unexpected exception from mgmtClient", e)
log.error("unexpected exception from HRI mgmtClient", e)
throw new FlinkException(e)
}
}
Expand Down
Binary file added src/test/resources/truststore.jks
Binary file not shown.
61 changes: 60 additions & 1 deletion src/test/scala/org/alvearie/hri/api/MgmtClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package org.alvearie.hri.api
import java.time.temporal.ChronoUnit
import java.time.{OffsetDateTime, ZoneOffset}
import java.util.Base64

import org.apache.http.{Header, HttpStatus, HttpVersion, ProtocolVersion}
import org.apache.http.message.BasicHttpResponse
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpPut, HttpUriRequest}
Expand All @@ -19,11 +18,13 @@ import org.apache.http.util.EntityUtils
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import org.alvearie.hri.flink.core.TestHelper
import org.mockito.ArgumentMatcher
import org.mockito.scalatest.MockitoSugar
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers._

import java.io.{FileNotFoundException, IOException}
import scala.util.{Failure, Success}

class MgmtClientTest extends AnyFunSuite with MockitoSugar{
Expand All @@ -41,6 +42,9 @@ class MgmtClientTest extends AnyFunSuite with MockitoSugar{
private val audience = "myAudience"
private val expectedTokenRequestParams = Array("grant_type=client_credentials", "scope=", s"${MgmtClient.hriInternalScope}", s"${MgmtClient.hriConsumerScope}", s"tenant_$tenantId", s"audience=$audience")

private val trustStorePath = "src/test/resources/truststore.jks"
private val trustStorePassword = "test_password"

class RequestMatcherPut(uri: String, bodyElements: Seq[String]) extends ArgumentMatcher[HttpUriRequest] {
override def matches(request: HttpUriRequest): Boolean = {
if (request.getMethod != "PUT") return false
Expand Down Expand Up @@ -362,6 +366,61 @@ class MgmtClientTest extends AnyFunSuite with MockitoSugar{
}
}

test("It should return a custom Http client when environment variables are set") {
TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath)
TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, trustStorePassword)

val client = MgmtClient.createHttpClient()
assert(client != null)
// there isn't a way to inspect the client's configuration to ensure the trust store was added

/**
* This is for manual testing against an actual instance running on Kubernetes
* 1. Copy the Kubernetes ca.crt from any pod at /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
* 2. Import it into src/test/resources/k-truststore.jks with
keytool -import -file ca.crt -keystore src/test/resources/k-truststore.jks -storepass test_password -alias kubernetes_ca
* 3. Edit /etc/hosts and append `127.0.0.1 hri-mgmt-api`
* 4. Then uncomment the lines below and run the test
*/
//val response = client.execute(new HttpGet("https://hri-mgmt-api:1323/hri/healthcheck"))
//System.out.println(response.toString)

// reset environment
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv)
}

test("It should throw an IllegalArgumentException when the trust store password variable is not set") {
TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath)

assertThrows[IllegalArgumentException](MgmtClient.createHttpClient())

// reset environment
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
}

test("It should throw an IOException when the trust store path is wrong") {
TestHelper.setEnv(MgmtClient.trustStoreEnv, "bad/path/to/truststore.jks")
TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, trustStorePassword)

assertThrows[FileNotFoundException](MgmtClient.createHttpClient())

// reset environment
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv)
}

test("It should throw an IOException when the trust store password variable is wrong") {
TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath)
TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, "wrong_password")

assertThrows[IOException](MgmtClient.createHttpClient())

// reset environment
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv)
}

}

private class FakeHttpResponse(ver: ProtocolVersion, code: Integer, reason: String) extends BasicHttpResponse(ver, code, reason) with CloseableHttpResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package org.alvearie.hri.flink.core

import java.time.OffsetDateTime
import java.util.concurrent.TimeUnit

import org.alvearie.hri.api.BatchNotification.Status
import org.alvearie.hri.api.{BatchNotification, MgmtClient, RequestException}
import org.alvearie.hri.flink.core.serialization.NotificationRecord
Expand Down Expand Up @@ -167,6 +166,15 @@ class MgmtApiSinkTest extends AnyFunSuite with MockitoSugar{
exMsg should equal("Call to HRI Management API failed: Unauthorized.")
}

test("it should throw a FlinkException when environment variables are incorrect") {
TestHelper.setEnv(MgmtClient.trustStoreEnv, "bad/path/to/truststore.jks")

val sink = new MgmtApiSink("tenant", "https://hri-mgmt-api", "client_id", "password", "audience", "https://oauth")
assertThrows[FlinkException](sink.createMgmtClient())

TestHelper.removeEnv(MgmtClient.trustStoreEnv)
}

def createTestBatchNotification(status: BatchNotification.Status, actualRecordCount: Int, invalidRecordCount: Int, failMsg: String): BatchNotification = {
new BatchNotification()
.withId(batchId)
Expand Down
16 changes: 16 additions & 0 deletions src/test/scala/org/alvearie/hri/flink/core/TestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,20 @@ object TestHelper {
buffer.getLong
}

// This sets an OS environment variable using reflection, which isn't normally allowed
def setEnv(name: String, value: String): Unit = {
val env = System.getenv
val field = env.getClass.getDeclaredField("m")
field.setAccessible(true)
field.get(env).asInstanceOf[java.util.Map[String,String]].put(name, value)
}

// This removes an OS environment variable using reflection, which isn't normally allowed
def removeEnv(name: String): Unit = {
val env = System.getenv
val field = env.getClass.getDeclaredField("m")
field.setAccessible(true)
field.get(env).asInstanceOf[java.util.Map[String,String]].remove(name)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class ValidationProcessFunctionTest extends AnyFunSuite with MockitoSugar {
verify(mockClient, times(1)).getBatchId(tenantId, DefaultTestBatchId)
}

test("processElement should call getBatch and unexpected error should be thrown from mgmtApi") {
test("processElement should call getBatch and unexpected error should be thrown from HRI mgmtApi") {
val mockClient = mock[MgmtClient]

val validator = new TestValidationProcessFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class HriTestRecsSourceFunction() extends RichParallelSourceFunction[HriRecord]
}

val numHriRecs = hriRecs.size
println(s"Starting Hri Test Recs Sources Processing of " +
println(s"Starting HRI Test Recs Sources Processing of " +
s"${numHriRecs} recs at startTime: $startTime")
val itr = hriRecs.iterator
while(isRunning && itr.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,4 @@ class NotificationSourceFunction() extends RichParallelSourceFunction[Notificati
delayProcessing
}


}

0 comments on commit 22862bd

Please sign in to comment.