From 22862bd3cdebfc2416923ea36688993a11fee1ef Mon Sep 17 00:00:00 2001 From: alisa <60156973+alisakotliarova@users.noreply.github.com> Date: Wed, 15 Sep 2021 15:11:18 -0400 Subject: [PATCH] Add v3.0-1.0.2 --- build.gradle | 4 +- .../org/alvearie/hri/api/MgmtClient.scala | 49 +++++++++++++- .../hri/flink/core/BaseValidationJob.scala | 2 +- .../alvearie/hri/flink/core/MgmtApiSink.scala | 9 ++- .../core/ValidationProcessFunction.scala | 4 +- src/test/resources/truststore.jks | Bin 0 -> 3909 bytes .../org/alvearie/hri/api/MgmtClientTest.scala | 61 +++++++++++++++++- .../hri/flink/core/MgmtApiSinkTest.scala | 10 ++- .../alvearie/hri/flink/core/TestHelper.scala | 16 +++++ .../core/ValidationProcessFunctionTest.scala | 2 +- .../sources/HriTestRecsSourceFunction.scala | 2 +- .../sources/NotificationSourceFunction.scala | 1 - 12 files changed, 146 insertions(+), 14 deletions(-) create mode 100644 src/test/resources/truststore.jks diff --git a/build.gradle b/build.gradle index 063da04..2314852 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ plugins { } group = 'org.alvearie.hri.flink' -version = '2.1-1.0.1' +version = '3.0-1.0.2' description = """HRI Flink Pipeline Core Library""" ext { @@ -92,7 +92,7 @@ dependencies { // Dependencies that library users should include in their job // shadow jar // -------------------------------------------------------------- - implementation "org.alvearie.hri:hri-api-batch-notification:2.1-2.0.1" + implementation "org.alvearie.hri:hri-api-batch-notification:3.0-2.0.1" implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}" implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" implementation "com.fasterxml.jackson.module:jackson-module-scala_${scalaBinaryVersion}:${jacksonVersion}" diff --git a/src/main/scala/org/alvearie/hri/api/MgmtClient.scala b/src/main/scala/org/alvearie/hri/api/MgmtClient.scala index 9d32b09..2f452cf 100644 --- a/src/main/scala/org/alvearie/hri/api/MgmtClient.scala +++ b/src/main/scala/org/alvearie/hri/api/MgmtClient.scala @@ -18,14 +18,17 @@ import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.alvearie.hri.flink.core.serialization.NotificationDeserializer +import org.apache.http.ssl.SSLContexts + import java.util.Base64 import java.util.ArrayList import org.slf4j.LoggerFactory +import java.io.{File, FileNotFoundException} import scala.util.{Failure, Success, Try} class MgmtClient(val baseUri: String, val clientId: String, val clientSecret: String, val audience: String, val oauthServiceBaseUrl: String, - val httpClient: CloseableHttpClient = HttpClients.createDefault()) extends Serializable with BatchLookup { + val httpClient: CloseableHttpClient = MgmtClient.createHttpClient()) extends Serializable with BatchLookup { private val log = LoggerFactory.getLogger(this.getClass) log.info("Creating HRI MgmtClient for {}", baseUri) @@ -127,11 +130,11 @@ class MgmtClient(val baseUri: String, val clientId: String, val clientSecret: St response = httpClient.execute(request) response.getStatusLine.getStatusCode match { case HttpStatus.SC_OK => - log.info("MgmtApi action call successful") + log.info("HRI MgmtApi action call successful") Success(entityMapper(response.getEntity)) case status => val msg = status.intValue + ": " + EntityUtils.toString(response.getEntity) - log.info("MgmtApi action call failed: {}", msg) + log.info("HRI MgmtApi action call failed: {}", msg) Failure(new RequestException(msg, status)) } } catch { @@ -159,4 +162,44 @@ object MgmtClient { val hriConsumerScope = "hri_consumer" val accessTokenField = "access_token" val audienceField = "audience" + + val trustStoreEnv = "HRI_TRUSTSTORE" + val trustStorePasswordEnv = "HRI_TRUSTSTORE_PASSWORD" + + /** + * If 'HRI_TRUSTSTORE' and 'HRI_TRUSTSTORE_PASSWORD' are set, constructs an Http client using the specified trust + * store. If not set, creates a default Http client. + * If unable to load the trust store or create the client, an Exception is thrown. + * @return a Http client + */ + def createHttpClient() : CloseableHttpClient = { + val log = LoggerFactory.getLogger(this.getClass) + val trustStorePath = System.getenv(trustStoreEnv) + val password = System.getenv(trustStorePasswordEnv) + + if( trustStorePath == null || trustStorePath.isEmpty ) { + log.info("HRI_TRUSTSTORE is not set, so creating default Http client") + return HttpClients.createDefault() + } else if ( password == null || password.isEmpty ) { + val msg = trustStoreEnv + " is set, but " + trustStorePasswordEnv + " is not. Both must be empty or set." + log.error(msg) + throw new IllegalArgumentException(msg) + } + log.info("Creating Http client with trust store {}", trustStorePath) + + val trustStoreFile = new File(trustStorePath); + if (!trustStoreFile.exists() || !trustStoreFile.isFile) { + val msg = "Not found or not a file: " + trustStoreFile.getPath + log.error(msg); + throw new FileNotFoundException(msg); + } + + val sslContext = SSLContexts.custom + .loadTrustMaterial(trustStoreFile, password.toCharArray) + .build + + return HttpClients.custom + .setSSLContext(sslContext) + .build + } } diff --git a/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala b/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala index a705a1a..6434933 100644 --- a/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala +++ b/src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala @@ -272,7 +272,7 @@ class BaseValidationJob( // used for functional testing of the Validation Jobs without the HRI Management API. def getRecordCountSink(props: Properties): SinkFunction[NotificationRecord] = { if (useMgmtApi) { - log.info("Creating MgmtApiSink({}) for Tracker output", mgmtApiUrl) + log.info("Creating HRI MgmtApiSink({}) for Tracker output", mgmtApiUrl) return new MgmtApiSink(tenantId, mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl) } else { log.info("Creating KafkaProducer({}) for Tracker output", notificationTopic) diff --git a/src/main/scala/org/alvearie/hri/flink/core/MgmtApiSink.scala b/src/main/scala/org/alvearie/hri/flink/core/MgmtApiSink.scala index 323c308..a820c4d 100644 --- a/src/main/scala/org/alvearie/hri/flink/core/MgmtApiSink.scala +++ b/src/main/scala/org/alvearie/hri/flink/core/MgmtApiSink.scala @@ -31,7 +31,14 @@ class MgmtApiSink(val tenantId: String, val mgmtApiUrl: String, val mgmtClientId @transient lazy val mgmtClient: MgmtClient = createMgmtClient() // This enables overriding for testing - def createMgmtClient(): MgmtClient = new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl) + def createMgmtClient(): MgmtClient = { + try { + new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl) + } catch { + case ex: Throwable => + throw new FlinkException(ex) // can't recover from this + } + } override def invoke(record: NotificationRecord, context: Context[_]): Unit = { val batch = record.value diff --git a/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala b/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala index 61ef984..f55b42d 100644 --- a/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala +++ b/src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala @@ -62,7 +62,7 @@ class ValidationProcessFunction( def createMgmtClient(): BatchLookup = new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl) /** - * constructor for testing purposes without a MgmtApiClient + * constructor for testing purposes without a HRI MgmtApiClient */ def this(notificationDescriptor: MapStateDescriptor[String, BatchNotification], invalidOutputTag: OutputTag[InvalidRecord], @@ -204,7 +204,7 @@ class ValidationProcessFunction( } } case Failure(e) => - log.error("unexpected exception from mgmtClient", e) + log.error("unexpected exception from HRI mgmtClient", e) throw new FlinkException(e) } } diff --git a/src/test/resources/truststore.jks b/src/test/resources/truststore.jks new file mode 100644 index 0000000000000000000000000000000000000000..a4ad69a45c49c6bd840a50447e974ac53097654e GIT binary patch literal 3909 zcmchZS5Om*)`gP*p<}>Mq)0CZ1VRZ_q!$GNL3))c0-^U7B1O7%q)I{wy$N`b8ajf~ zyOCZ)5d{Q6e&^2och1atzYptSeNTJ#tUY`EKKOkA002OL1pJSPeI0##|Cr#a`7dn% z0C_YFMIH&G0aFNpfy6*T5G@c$0w6|_yW>?VQt3&jojNwMcMN^A`h@1_h&9lrWvSn51D)*@S+z9GNx9OI%C9b;OP)VMCGF2l}>P!vlL~_Z!(Zeo# zo^MsIB_(dxE$BU&cm2*lz>%!=O6nbG66l-?7UDhgr5;Pkc#JVT4cnQ>`4C2bDE!WN zv^xFSZF0Dxt_MBjfl!)D1U8qZ2Hc(H`3o&lC2En)&eVm51gmr2Oz0+!Q<+Oe1m0FX z55e6VXcB9ks@B*WxxKGLGI?4*TVmAdUP36;>XIjnBu!i=IX_!;kQY<-SK>=gz84Xh zrPK*k(4g4?MZEHv3lJn3&~oACZFo8+m;aOfOhxB_+hm;$9Rrhk;%s86N8cItcs94h zg3eVv2v-^t?6?DUA}yTWD~&ekoT>i`+l;3SO%V>Q))M<0;lGerFA2}KGUZ4OOGf`p zsA7?dsVR)wSQ5nux(0+@DPa}z{JQW= z%0#(~7j=uRCrE!NC3Dr7V+M!3fXu@P&(R||&JId^MGB_We$QscJv@=4-rT4Y-o4pV zHumOc!fIh1ieo(7D>XK@h;d-cL2a&f9mOB2I!Wpvhn-5Al4TCX8AGujm!F-;Huy0j z3ynox)iJIMn!S$a@!r)Ag=2CA6(gl4AmG{Byf@dq&d8vecPdb{;5YN(Zsta76zx7jwswXRzMV|v1+rSQNC za7Nb=e*F+yu<;T9{d}mofgcd4Z>iyk{pM8rEm8)FCP{~C>r=h8nL{B(Ow1CxY>Jw` zzD|@q+FaF>;4(=W_R7emPu{Wx307;m*9bxa1&*Zchm>Y@I9FaT-fCujrf1DjB6O%U zodpZ?HBS*_SY-}*j^BINfPTEv9DcFV&ZgXupV3tDvgAS~qs-{eXk4w^6+w~P(jv0J zcK)E-iBF4UlZmOBl$?Ux#VSI*S$-3%`})Xfu29CzF1JQL!?LIwb}e`q0?M4VWvKw9 zPG*kzmJ_a&ML#)(kL|ROgRJ14J}b3yGKQV|s_Xq6m0tp=F8O^rJ_8U5Okr6GwVrv|^I~iH z-K*@MO+2@W^$gwL^exeg`vH)KBU&b}$gAdvBdxz*6M97jhPrF~&faiwe(-fMO$l0* zJOQwv;{&TlPq>o8M=uV7lTUj^mNK8VSnUp&R&4$KD~`#h458N-xtr#{fazG0Keeb> zeLD76Ok4B*w`xmN7m?;R^a=k|Z8Blfd$OY+oez4Z-82~+veI(ncxrKwoqF6ocgx9=_v4Dj_v17MZeebWADn{*@2aYEuLTrnTFq8p^z6SI{`vTge7`h>OY}p@ zA_a*^f!j!PnfhI^AK^f)(wZ!@d}dbC(SC$F_mz${C{UfvGT-5LS>5x)%D7|MFZIB% zLjfbtSisQBE?pleApQ~h^my!%td`vHjCPmq99S=g3xN;h7Swza-yb;UWcBM_;N&CD z;&$9eX#?a9fyqI5;rzAYh8`i(YZtv;yq|g$q~%)06l_(NPBBpL;}Zmr)LW8%UFRMQjkY;14*q(*j{{YnvA*WUikD~m-(<}5X|4=W zVrtRG3%ie~61R9**B!IG(<(j-=~?>9)+BU zQq8j=@xH2Y15sB*FBEbgFE;26v!GTdW;LP@+}8_wG{|IC%33QLHx6^9*fU7OgK0df zxtdL!hF(Q)sh#igCvlAqUmo~Usrq?mU3~>4CZPsKijUdS_QmLs9-NMq!>?Tt0`5Gy z1Sg5I2XN}3qO5AzY%8Q9=Po!K^5V4Jp*Y(s9D=nJC+nl`C;R-D67a#I5fg^A}ZJ4P`gfJw~`#nAgFJf?5nvB8|A)WfnO1kXxeV8jX}-_Fk5(1@1c zXMi7*)$s@KZHWr_|%B7hv4z`LG zjt$|dO0@0+V)8k669oEn3ou2gsFaybw*n?oWj|F;-O_TSSIpR&!}gr}ChH;>)|QHI zZ#b`V6fC=!Qs+S5l~?_G}P%xTfjg3Ru;;9=!|*tQp#^i zCev6`wxgIjy||Eptr1(eXPyIu#MFG4IR$yEAN1>7PKC`hbL;;8p7-MYt;UU`8FVB% z$;ijCinnGK!aPC5Bk;1cmGAic^M~^f88YLMfop{Lmxkzzhtt%L*2VZ;rTj0 zjS#K~j{wB~6S991(ti<(e-X-m5u*PR*j+|yaWR;*7+f4C4u?yc|FIJPu>a#}P#~^< zuH#Q(B|?F20bnQ)1qcNK0_qK!F^*?{o=##q1xJT2=Z~F|2SgD<&tG!NRWs6@`hQ9% zZCC1kG068G-^3l{oXxVtdDRLGea7m!yI!TF-)GwD=%8~{LF)}`4gD(VAe}--INGFh zt~y9KPG(b;UPT5fQCk}?C~{Rxwo=PkOxiR?J(A8;e?a*W)FQttRQwSxeTXKa9rbbC z#je<457#o2mOR0cud>vB=d?OK;Ki3#4B&f~aAoCz+2AIHvvd6J9Ul$%6O;brCF&g8 zyyeb((+49_Bo+>ak`C!o$wE>iiwZXW-&O?m6Qt6H@tx&!b`a$!_mPoVyY`6%CqklE z$t`EE4j(gHhh_?pw|2+DS91MQY4skyUOo8T|UP(abapx6rD|j zrGlw^cX7X{dC2+O2ya&pj&@Q?vt4aPkea@!cdxBkue2pLqnToF%vVo`r#oX1S|Y-Q zgCjFRJq;Lp5D^dn#BjpcVQhacgpG!nffzaj4=*|Y+`JG$2j;%;jXg-NcJcdXi2mHv zpAZ27uZ3w|D%_He$dqzO3h5wl6p}9S;Fp6&J3c4rxvybbef;&yy7BA6%6 z4jv+JOqCgBYUMg4$H_;@?BQ3w9@k^kh{SDMaEq*YrgqkotC*<)&%goi(dp|R;k5Qe zfLlwxb*=IQqZ=phzD&wvf89-E--4Ss@4pPhLzz~b(reQ^vyzd^ zgPWJM5(t?IJG#`M6Q(c5tJO{h(cgJV(>xZQCf|wT;*9E-<}+Zl-@J!^tdnGDmcj)s zFy6O}?(EeOrRg#_!!a5^W%WuKc3w?lemdIuR-05o!1~yPM>}r?c9fp7;9?t43gm)! zk>z8M6dp?Xz0Y_|#Oi(}Q0#L?h-3S6+g>jbPz4ru+TADv?|vV=t!(w~ONa}6j*Q>2 z0>SL>3VzxPD5+ob71fVu$ZdA$Y|&SgXDcz;(v4~D7@83Bb^w#>?Pp%4COS?Nw&XVX zScdxVxO8{S8W^a5^`;nJLj!&026x$2k3%w++ceVi5-$ssye>nq-sL)2$N34in6RVM z;ai-2Wg9VF{U&xCz_nubU~xgBk&IUgyK_^XWlXp{K5gamt|&tqBX8im3_cpN@N&;~ VSJN5taHVCcfNO)@K+BBu{{X)LBaHw6 literal 0 HcmV?d00001 diff --git a/src/test/scala/org/alvearie/hri/api/MgmtClientTest.scala b/src/test/scala/org/alvearie/hri/api/MgmtClientTest.scala index c210ad4..bc729fe 100644 --- a/src/test/scala/org/alvearie/hri/api/MgmtClientTest.scala +++ b/src/test/scala/org/alvearie/hri/api/MgmtClientTest.scala @@ -9,7 +9,6 @@ package org.alvearie.hri.api import java.time.temporal.ChronoUnit import java.time.{OffsetDateTime, ZoneOffset} import java.util.Base64 - import org.apache.http.{Header, HttpStatus, HttpVersion, ProtocolVersion} import org.apache.http.message.BasicHttpResponse import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpPut, HttpUriRequest} @@ -19,11 +18,13 @@ import org.apache.http.util.EntityUtils import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import org.alvearie.hri.flink.core.TestHelper import org.mockito.ArgumentMatcher import org.mockito.scalatest.MockitoSugar import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers._ +import java.io.{FileNotFoundException, IOException} import scala.util.{Failure, Success} class MgmtClientTest extends AnyFunSuite with MockitoSugar{ @@ -41,6 +42,9 @@ class MgmtClientTest extends AnyFunSuite with MockitoSugar{ private val audience = "myAudience" private val expectedTokenRequestParams = Array("grant_type=client_credentials", "scope=", s"${MgmtClient.hriInternalScope}", s"${MgmtClient.hriConsumerScope}", s"tenant_$tenantId", s"audience=$audience") + private val trustStorePath = "src/test/resources/truststore.jks" + private val trustStorePassword = "test_password" + class RequestMatcherPut(uri: String, bodyElements: Seq[String]) extends ArgumentMatcher[HttpUriRequest] { override def matches(request: HttpUriRequest): Boolean = { if (request.getMethod != "PUT") return false @@ -362,6 +366,61 @@ class MgmtClientTest extends AnyFunSuite with MockitoSugar{ } } + test("It should return a custom Http client when environment variables are set") { + TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath) + TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, trustStorePassword) + + val client = MgmtClient.createHttpClient() + assert(client != null) + // there isn't a way to inspect the client's configuration to ensure the trust store was added + + /** + * This is for manual testing against an actual instance running on Kubernetes + * 1. Copy the Kubernetes ca.crt from any pod at /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + * 2. Import it into src/test/resources/k-truststore.jks with + keytool -import -file ca.crt -keystore src/test/resources/k-truststore.jks -storepass test_password -alias kubernetes_ca + * 3. Edit /etc/hosts and append `127.0.0.1 hri-mgmt-api` + * 4. Then uncomment the lines below and run the test + */ + //val response = client.execute(new HttpGet("https://hri-mgmt-api:1323/hri/healthcheck")) + //System.out.println(response.toString) + + // reset environment + TestHelper.removeEnv(MgmtClient.trustStoreEnv) + TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv) + } + + test("It should throw an IllegalArgumentException when the trust store password variable is not set") { + TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath) + + assertThrows[IllegalArgumentException](MgmtClient.createHttpClient()) + + // reset environment + TestHelper.removeEnv(MgmtClient.trustStoreEnv) + } + + test("It should throw an IOException when the trust store path is wrong") { + TestHelper.setEnv(MgmtClient.trustStoreEnv, "bad/path/to/truststore.jks") + TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, trustStorePassword) + + assertThrows[FileNotFoundException](MgmtClient.createHttpClient()) + + // reset environment + TestHelper.removeEnv(MgmtClient.trustStoreEnv) + TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv) + } + + test("It should throw an IOException when the trust store password variable is wrong") { + TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath) + TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, "wrong_password") + + assertThrows[IOException](MgmtClient.createHttpClient()) + + // reset environment + TestHelper.removeEnv(MgmtClient.trustStoreEnv) + TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv) + } + } private class FakeHttpResponse(ver: ProtocolVersion, code: Integer, reason: String) extends BasicHttpResponse(ver, code, reason) with CloseableHttpResponse { diff --git a/src/test/scala/org/alvearie/hri/flink/core/MgmtApiSinkTest.scala b/src/test/scala/org/alvearie/hri/flink/core/MgmtApiSinkTest.scala index dc3e845..fbabb2a 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/MgmtApiSinkTest.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/MgmtApiSinkTest.scala @@ -8,7 +8,6 @@ package org.alvearie.hri.flink.core import java.time.OffsetDateTime import java.util.concurrent.TimeUnit - import org.alvearie.hri.api.BatchNotification.Status import org.alvearie.hri.api.{BatchNotification, MgmtClient, RequestException} import org.alvearie.hri.flink.core.serialization.NotificationRecord @@ -167,6 +166,15 @@ class MgmtApiSinkTest extends AnyFunSuite with MockitoSugar{ exMsg should equal("Call to HRI Management API failed: Unauthorized.") } + test("it should throw a FlinkException when environment variables are incorrect") { + TestHelper.setEnv(MgmtClient.trustStoreEnv, "bad/path/to/truststore.jks") + + val sink = new MgmtApiSink("tenant", "https://hri-mgmt-api", "client_id", "password", "audience", "https://oauth") + assertThrows[FlinkException](sink.createMgmtClient()) + + TestHelper.removeEnv(MgmtClient.trustStoreEnv) + } + def createTestBatchNotification(status: BatchNotification.Status, actualRecordCount: Int, invalidRecordCount: Int, failMsg: String): BatchNotification = { new BatchNotification() .withId(batchId) diff --git a/src/test/scala/org/alvearie/hri/flink/core/TestHelper.scala b/src/test/scala/org/alvearie/hri/flink/core/TestHelper.scala index 0da420a..974dc87 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/TestHelper.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/TestHelper.scala @@ -88,4 +88,20 @@ object TestHelper { buffer.getLong } + // This sets an OS environment variable using reflection, which isn't normally allowed + def setEnv(name: String, value: String): Unit = { + val env = System.getenv + val field = env.getClass.getDeclaredField("m") + field.setAccessible(true) + field.get(env).asInstanceOf[java.util.Map[String,String]].put(name, value) + } + + // This removes an OS environment variable using reflection, which isn't normally allowed + def removeEnv(name: String): Unit = { + val env = System.getenv + val field = env.getClass.getDeclaredField("m") + field.setAccessible(true) + field.get(env).asInstanceOf[java.util.Map[String,String]].remove(name) + } + } diff --git a/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala b/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala index c67a720..51a034a 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala @@ -157,7 +157,7 @@ class ValidationProcessFunctionTest extends AnyFunSuite with MockitoSugar { verify(mockClient, times(1)).getBatchId(tenantId, DefaultTestBatchId) } - test("processElement should call getBatch and unexpected error should be thrown from mgmtApi") { + test("processElement should call getBatch and unexpected error should be thrown from HRI mgmtApi") { val mockClient = mock[MgmtClient] val validator = new TestValidationProcessFunction( diff --git a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala index a052ac1..80fbb6b 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/HriTestRecsSourceFunction.scala @@ -50,7 +50,7 @@ class HriTestRecsSourceFunction() extends RichParallelSourceFunction[HriRecord] } val numHriRecs = hriRecs.size - println(s"Starting Hri Test Recs Sources Processing of " + + println(s"Starting HRI Test Recs Sources Processing of " + s"${numHriRecs} recs at startTime: $startTime") val itr = hriRecs.iterator while(isRunning && itr.hasNext) { diff --git a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala index 0185f6a..607684b 100644 --- a/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala +++ b/src/test/scala/org/alvearie/hri/flink/core/jobtest/sources/NotificationSourceFunction.scala @@ -87,5 +87,4 @@ class NotificationSourceFunction() extends RichParallelSourceFunction[Notificati delayProcessing } - }