diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/health/KafkaHealthIndicator.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/health/KafkaHealthIndicator.java
index ac947596..6bd9e002 100644
--- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/health/KafkaHealthIndicator.java
+++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/health/KafkaHealthIndicator.java
@@ -1,45 +1,45 @@
-/*
- * SPDX-License-Identifier: AGPL-3.0-or-later
- *
- * Copyright (C) 2023 Ministero della Salute
- *
- * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License along with this program. If not, see .
- */
-package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.health;
-
-import org.apache.kafka.clients.admin.AdminClient;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.actuate.health.Health;
-import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.stereotype.Component;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Component
-@Slf4j
-public class KafkaHealthIndicator implements HealthIndicator {
-
- @Autowired
- private AdminClient client;
-
- @Override
- public Health health() {
- Health health = null;
- try {
- client.listTopics().listings().get();
- health = Health.up().build();
- } catch (InterruptedException e) {
- log.warn("Interrupted!", e);
- health = Health.down(e).build();
- // Restore interrupted state...
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- health = Health.down(e).build();
- }
- return health;
- }
-}
\ No newline at end of file
+///*
+// * SPDX-License-Identifier: AGPL-3.0-or-later
+// *
+// * Copyright (C) 2023 Ministero della Salute
+// *
+// * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
+// *
+// * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+// *
+// * You should have received a copy of the GNU Affero General Public License along with this program. If not, see .
+// */
+//package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.health;
+//
+//import org.apache.kafka.clients.admin.AdminClient;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.boot.actuate.health.Health;
+//import org.springframework.boot.actuate.health.HealthIndicator;
+//import org.springframework.stereotype.Component;
+//
+//import lombok.extern.slf4j.Slf4j;
+//
+//@Component
+//@Slf4j
+//public class KafkaHealthIndicator implements HealthIndicator {
+//
+// @Autowired
+// private AdminClient client;
+//
+// @Override
+// public Health health() {
+// Health health = null;
+// try {
+// client.listTopics().listings().get();
+// health = Health.up().build();
+// } catch (InterruptedException e) {
+// log.warn("Interrupted!", e);
+// health = Health.down(e).build();
+// // Restore interrupted state...
+// Thread.currentThread().interrupt();
+// } catch (Exception e) {
+// health = Health.down(e).build();
+// }
+// return health;
+// }
+//}
\ No newline at end of file
diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaProducerCFG.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaProducerCFG.java
index 154c3028..ffea413f 100644
--- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaProducerCFG.java
+++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaProducerCFG.java
@@ -72,7 +72,11 @@ public Map producerConfigs() {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, id + "-" + kafkaProducerPropCFG.getTransactionalId());
props.put(ProducerConfig.ACKS_CONFIG,kafkaProducerPropCFG.getAck());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,kafkaProducerPropCFG.getIdempotence());
+
+ props.put("sasl.login.callback.handler.class","it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2.OauthAuthenticateLoginCallbackHandler");
+ props.put("sasl.client.callback.handler.class","it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2.OauthAuthenticateValidatorCallbackHandler");
+
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) {
props.put("security.protocol", kafkaPropCFG.getProtocol());
}
diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaPropertiesCFG.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaPropertiesCFG.java
index e30e0716..7337860e 100644
--- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaPropertiesCFG.java
+++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/KafkaPropertiesCFG.java
@@ -78,17 +78,17 @@ public class KafkaPropertiesCFG {
@Autowired
private ProfileUtility profileUtility;
- @Bean
- public AdminClient client() {
- Properties configProperties = new Properties();
- configProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
- if(!profileUtility.isDevOrDockerProfile() && !profileUtility.isTestProfile()) {
- configProperties.put("security.protocol", protocol);
- configProperties.put("sasl.mechanism", mechanism);
- configProperties.put("sasl.jaas.config", configJaas);
- configProperties.put("ssl.truststore.location", trustoreLocation);
- configProperties.put("ssl.truststore.password", String.valueOf(trustorePassword));
- }
- return AdminClient.create(configProperties);
- }
+// @Bean
+// public AdminClient client() {
+// Properties configProperties = new Properties();
+// configProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
+// if(!profileUtility.isDevOrDockerProfile() && !profileUtility.isTestProfile()) {
+// configProperties.put("security.protocol", protocol);
+// configProperties.put("sasl.mechanism", mechanism);
+// configProperties.put("sasl.jaas.config", configJaas);
+// configProperties.put("ssl.truststore.location", trustoreLocation);
+// configProperties.put("ssl.truststore.password", String.valueOf(trustorePassword));
+// }
+// return AdminClient.create(configProperties);
+// }
}
diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthAuthenticateLoginCallbackHandler.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthAuthenticateLoginCallbackHandler.java
new file mode 100644
index 00000000..558ac19e
--- /dev/null
+++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthAuthenticateLoginCallbackHandler.java
@@ -0,0 +1,72 @@
+package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
+ private final Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.class);
+ private Map moduleOptions = null;
+ private boolean configured = false;
+
+ @Override
+ public void configure(Map map, String saslMechanism, List jaasConfigEntries) {
+ if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+ throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+ if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+ throw new IllegalArgumentException(
+ String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+ jaasConfigEntries.size()));
+ this.moduleOptions = Collections.unmodifiableMap((Map) jaasConfigEntries.get(0).getOptions());
+ configured = true;
+ }
+
+ public boolean isConfigured(){
+ return this.configured;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ if (!isConfigured())
+ throw new IllegalStateException("Callback handler not configured");
+ for (Callback callback : callbacks) {
+ if (callback instanceof OAuthBearerTokenCallback)
+ try {
+ handleCallback((OAuthBearerTokenCallback) callback);
+ } catch (KafkaException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ else
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+
+ private void handleCallback(OAuthBearerTokenCallback callback){
+ if (callback.token() != null)
+ throw new IllegalArgumentException("Callback had a token already");
+
+ log.info("Try to acquire token!");
+// OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer("YnJva2VyLWthZmthOmJyb2tlci1rYWZrYQ==");
+ OauthBearerTokenJwt token = OauthHttpCalls.login("broker-kafka");
+ log.info("Retrieved token..");
+ if(token == null){
+ throw new IllegalArgumentException("Null token returned from server");
+ }
+ callback.token(token);
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthAuthenticateValidatorCallbackHandler.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthAuthenticateValidatorCallbackHandler.java
new file mode 100644
index 00000000..adccc033
--- /dev/null
+++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthAuthenticateValidatorCallbackHandler.java
@@ -0,0 +1,84 @@
+package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerValidationResult;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
+ private final Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.class);
+ private List jaasConfigEntries;
+ private Map moduleOptions = null;
+ private boolean configured = false;
+ private Time time = Time.SYSTEM;
+
+ @Override
+ public void configure(Map map, String saslMechanism, List jaasConfigEntries) {
+ if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+ throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+ if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+ throw new IllegalArgumentException(
+ String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+ jaasConfigEntries.size()));
+ this.moduleOptions = Collections.unmodifiableMap((Map) jaasConfigEntries.get(0).getOptions());
+ configured = true;
+ }
+
+ public boolean isConfigured(){
+ return this.configured;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ if (!isConfigured())
+ throw new IllegalStateException("Callback handler not configured");
+ for (Callback callback : callbacks) {
+ if (callback instanceof OAuthBearerValidatorCallback)
+ try {
+ OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
+ handleCallback(validationCallback);
+ } catch (KafkaException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ else
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+
+ private void handleCallback(OAuthBearerValidatorCallback callback){
+ String accessToken = callback.tokenValue();
+ if (accessToken == null)
+ throw new IllegalArgumentException("Callback missing required token value");
+
+ log.info("Trying to introspect Token!");
+ OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
+ log.info("Trying to introspected");
+
+ // Implement Check Expire Token..
+ long now = time.milliseconds();
+ if(now > token.expirationTime()){
+ OAuthBearerValidationResult.newFailure("Expired Token, needs refresh!");
+ }
+
+ log.info("Validated! token..");
+ callback.token(token);
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthBearerTokenJwt.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthBearerTokenJwt.java
new file mode 100644
index 00000000..f5a6bccb
--- /dev/null
+++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthBearerTokenJwt.java
@@ -0,0 +1,113 @@
+package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class OauthBearerTokenJwt implements OAuthBearerToken {
+
+ private String value;
+ private long lifetimeMs;
+ private String principalName;
+ private Long startTimeMs;
+ private Set scope;
+ private long expirationTime;
+ private String jti;
+
+ private Set publicFields;
+ private Set privateFields;
+
+ public OauthBearerTokenJwt(String accessToken, long lifeTimeS, long startTimeMs, String principalName){
+ super();
+ this.value = accessToken;
+ this.principalName= principalName;
+ this.lifetimeMs = startTimeMs + (lifeTimeS * 1000);
+ this.startTimeMs = startTimeMs;
+ this.expirationTime = startTimeMs + (lifeTimeS * 1000);
+ }
+
+ public OauthBearerTokenJwt(Map jwtToken, String accessToken){
+ super();
+ this.value = accessToken;
+ this.principalName = (String) jwtToken.get("sub");
+
+ if(this.scope == null){
+ this.scope = new TreeSet<>();
+ }
+ if(jwtToken.get("scope") instanceof String ){
+ this.scope.add((String) jwtToken.get("scope"));
+ }else if(jwtToken.get("scope") instanceof List){
+ for(String s : (List) jwtToken.get("scope")){
+ this.scope.add(s);
+ }
+ }
+
+ Object exp = jwtToken.get("exp");
+ if(exp instanceof Integer){
+ this.expirationTime = Integer.toUnsignedLong((Integer) jwtToken.get("exp")) ;
+ }else{
+ this.expirationTime = (Long) jwtToken.get("exp");
+ }
+
+ Object iat = jwtToken.get("iat");
+ if(exp instanceof Integer){
+ this.startTimeMs = Integer.toUnsignedLong((Integer) jwtToken.get("iat")) ;
+ }else{
+ this.startTimeMs = (Long) jwtToken.get("iat");
+ }
+
+ this.lifetimeMs = expirationTime;
+ this.jti = (String) jwtToken.get("jti");
+ }
+
+ @Override
+ public String value() {
+ return value;
+ }
+
+ @Override
+ public Set scope() {
+ return scope;
+ }
+
+ @Override
+ public long lifetimeMs() {
+ return lifetimeMs;
+ }
+
+ @Override
+ public String principalName() {
+ return principalName;
+ }
+
+ @Override
+ public Long startTimeMs() {
+ return startTimeMs != null ? startTimeMs : 0;
+ }
+
+ public long expirationTime(){
+ return expirationTime;
+ }
+
+ public String jti(){
+ return jti;
+ }
+
+ @Override
+ public String toString() {
+ return "OauthBearerTokenJwt{" +
+ "value='" + value + '\'' +
+ ", lifetimeMs=" + lifetimeMs +
+ ", principalName='" + principalName + '\'' +
+ ", startTimeMs=" + startTimeMs +
+ ", scope=" + scope +
+ ", expirationTime=" + expirationTime +
+ ", jti='" + jti + '\'' +
+ ", publicFields=" + publicFields +
+ ", privateFields=" + privateFields +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthHttpCalls.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthHttpCalls.java
new file mode 100644
index 00000000..eab90f04
--- /dev/null
+++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/dispatcher/config/kafka/oauth2/OauthHttpCalls.java
@@ -0,0 +1,251 @@
+package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+
+public class OauthHttpCalls {
+
+ private static final Logger log = LoggerFactory.getLogger(OauthHttpCalls.class);
+
+// private static final String OAUTH_LOGIN_SERVER = (String) getEnvironmentVariables("OAUTH_LOGIN_SERVER", "");
+// private static final String OAUTH_LOGIN_ENDPOINT = (String) getEnvironmentVariables("OAUTH_LOGIN_ENDPOINT", "");
+// private static final String OAUTH_LOGIN_GRANT_TYPE = (String) getEnvironmentVariables("OAUTH_LOGIN_GRANT_TYPE", "");
+// private static final String OAUTH_LOGIN_SCOPE = (String) getEnvironmentVariables("OAUTH_LOGIN_SCOPE", "");
+//
+// private static final String OAUTH_INTROSPECT_SERVER = (String) getEnvironmentVariables("OAUTH_INTROSPECT_SERVER", "");
+// private static final String OAUTH_INTROSPECT_ENDPOINT = (String) getEnvironmentVariables("OAUTH_INTROSPECT_ENDPOINT", "");
+//
+// private static final String OAUTH_LOGIN_AUTHORIZATION = (String) getEnvironmentVariables("OAUTH_AUTHORIZATION", "");
+// private static final String OAUTH_INTROSPECT_AUTHORIZATION = (String) getEnvironmentVariables("OAUTH_INTROSPECT_AUTHORIZATION", "");
+
+ // OAuth Login Configuration
+ private static final String OAUTH_LOGIN_SERVER = (String)getEnvironmentVariables("OAUTH_LOGIN_SERVER", "hydra:4444");
+ private static final String OAUTH_LOGIN_ENDPOINT = (String)getEnvironmentVariables("OAUTH_LOGIN_ENDPOINT", "/oauth2/token");
+ private static final String OAUTH_LOGIN_GRANT_TYPE = (String)getEnvironmentVariables("OAUTH_LOGIN_GRANT_TYPE", "client_credentials");
+ private static final String OAUTH_LOGIN_SCOPE = (String)getEnvironmentVariables("OAUTH_LOGIN_SCOPE", "broker.kafka");
+
+ // OAuth Introspect Configuration
+ private static final String OAUTH_INTROSPECT_SERVER = (String)getEnvironmentVariables("OAUTH_INTROSPECT_SERVER", "http://localhost:4445");
+ private static final String OAUTH_INTROSPECT_ENDPOINT = (String)getEnvironmentVariables("OAUTH_INTROSPECT_ENDPOINT", "/oauth2/introspect");
+
+ // OAuth Authorization Headers
+ private static final String OAUTH_LOGIN_AUTHORIZATION = (String)getEnvironmentVariables("OAUTH_AUTHORIZATION", "Basic cHJvZHVjZXIta2Fma2E6cHJvZHVjZXIta2Fma2E=");
+ private static final String OAUTH_INTROSPECT_AUTHORIZATION = (String)getEnvironmentVariables("OAUTH_INTROSPECT_AUTHORIZATION", "Basic cHJvZHVjZXIta2Fma2E6cHJvZHVjZXIta2Fma2E=");
+
+
+ private static final boolean OAUTH_ACCEPT_UNSECURE_SERVER = (Boolean) getEnvironmentVariables("OAUTH_ACCEPT_UNSECURE_SERVER", false);
+ private static final boolean OAUTH_WITH_SSL = (Boolean) getEnvironmentVariables("OAUTH_WITH_SSL", false);
+ private static Time time = Time.SYSTEM;
+
+ public static void acceptUnsecureServer(){
+ if(OAUTH_ACCEPT_UNSECURE_SERVER){
+ TrustManager[] trustAllCerts = new TrustManager[]{
+ new X509TrustManager() {
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ public void checkClientTrusted(
+ java.security.cert.X509Certificate[] certs, String authType) {
+ }
+ public void checkServerTrusted(
+ java.security.cert.X509Certificate[] certs, String authType) {
+ }
+ }
+ };
+ try{
+ SSLContext sc = SSLContext.getInstance("SSL");
+ sc.init(null, trustAllCerts, new java.security.SecureRandom());
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+ }catch(NoSuchAlgorithmException e){
+ log.error("at acceptUnsecureServer :", e);
+ }catch(KeyManagementException e){
+ log.error("at acceptUnsecureServer :", e);
+ }
+ }
+ }
+
+ public static OauthBearerTokenJwt login(String clientId) {
+ OauthBearerTokenJwt result = null;
+ try {
+ acceptUnsecureServer();
+ long callTime = time.milliseconds();
+
+ //Mount POST data
+ String grantType = "grant_type=" + OAUTH_LOGIN_GRANT_TYPE;
+ String scope = "scope=" + "producer.kafka";//OAUTH_LOGIN_SCOPE;
+ String postDataStr = grantType + "&" + scope;
+
+ log.info("Try to login with oauth!");
+ Map resp = null;
+ if(OAUTH_WITH_SSL){
+ resp = doHttpsCall(OAUTH_LOGIN_SERVER + OAUTH_LOGIN_ENDPOINT, postDataStr, OAUTH_LOGIN_AUTHORIZATION);
+ }else{
+ resp = doHttpCall(OAUTH_LOGIN_SERVER + OAUTH_LOGIN_ENDPOINT, postDataStr, OAUTH_LOGIN_AUTHORIZATION);
+ }
+
+ if(resp != null){
+ String accessToken = (String) resp.get("access_token");
+ long expiresIn = ((Integer) resp.get("expires_in")).longValue();
+ result = new OauthBearerTokenJwt(accessToken, expiresIn, callTime, clientId);
+ } else {
+ throw new Exception("with resp null at login");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ public static OauthBearerTokenJwt introspectBearer(String accessToken){
+ OauthBearerTokenJwt result = null;
+ try {
+ //Mount POST data
+ String token = "token=" + accessToken;
+
+ log.info("Try to introspect with oauth!");
+ Map resp = null;
+ if(OAUTH_WITH_SSL){
+ resp = doHttpsCall(OAUTH_INTROSPECT_SERVER + OAUTH_INTROSPECT_ENDPOINT, token, OAUTH_INTROSPECT_AUTHORIZATION);
+ }else{
+ resp = doHttpCall(OAUTH_INTROSPECT_SERVER + OAUTH_INTROSPECT_ENDPOINT, token, OAUTH_INTROSPECT_AUTHORIZATION);
+ }
+ if(resp != null){
+ if((boolean) resp.get("active")){
+ result = new OauthBearerTokenJwt(resp, accessToken);
+ }else{
+ throw new Exception("Expired Token");
+ }
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ private static Map doHttpCall(String urlStr, String postParameters, String oauthToken){
+ try{
+ System.out.println("doHttpCall ->");
+ acceptUnsecureServer();
+
+ byte[] postData = postParameters.getBytes( StandardCharsets.UTF_8 );
+ int postDataLength = postData.length;
+
+ URL url = new URL("http://" + urlStr);
+ HttpURLConnection con = (HttpURLConnection) url.openConnection();
+ con.setInstanceFollowRedirects(true);
+ con.setRequestMethod("POST");
+ con.setRequestProperty("Authorization", oauthToken);
+ con.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+ con.setRequestProperty("charset", "utf-8");
+ con.setRequestProperty("Content-Length", Integer.toString(postDataLength ));
+ con.setUseCaches(false);
+ con.setDoOutput(true);
+
+ try(DataOutputStream wr = new DataOutputStream(con.getOutputStream())) {
+ wr.write( postData );
+ }
+
+ int responseCode = con.getResponseCode();
+ if (responseCode == 200) {
+ return handleJsonResponse(con.getInputStream());
+ } else {
+ throw new Exception("Return code " + responseCode);
+ }
+ }catch (Exception e){
+ log.error("at doHttpCall", e);
+ }
+ return null;
+ }
+
+ private static Map doHttpsCall(String urlStr, String postParameters, String oauthToken){
+ try{
+ acceptUnsecureServer();
+
+ byte[] postData = postParameters.getBytes( StandardCharsets.UTF_8 );
+ int postDataLength = postData.length;
+
+ URL url = new URL("https://" + urlStr);
+ HttpsURLConnection con = (HttpsURLConnection) url.openConnection();
+ con.setInstanceFollowRedirects(true);
+ con.setRequestMethod("POST");
+ con.setRequestProperty("Authorization", oauthToken);
+ con.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
+ con.setRequestProperty("charset", "utf-8");
+ con.setRequestProperty("Content-Length", Integer.toString(postDataLength ));
+ con.setUseCaches(false);
+ con.setDoOutput(true);
+
+ try(DataOutputStream wr = new DataOutputStream(con.getOutputStream())) {
+ wr.write( postData );
+ }
+
+ int responseCode = con.getResponseCode();
+ if (responseCode == 200) {
+ return handleJsonResponse(con.getInputStream());
+ }else {
+ throw new Exception("Return code " + responseCode);
+ }
+ }catch (Exception e){
+ log.error("at doHttpCall");
+ }
+ return null;
+ }
+
+ private static Object getEnvironmentVariables(String envName, Object defaultValue) {
+ Object result = null;
+ String env = System.getenv(envName);
+ if(env == null){
+ result = defaultValue;
+ }else{
+ if(defaultValue instanceof Boolean){
+ result = Boolean.valueOf(env);
+ }else if(defaultValue instanceof Integer){
+ result = Integer.valueOf(env);
+ }else if(defaultValue instanceof Double){
+ result = Double.valueOf(env);
+ }else if(defaultValue instanceof Float){
+ result = Float.valueOf(env);
+ }else{
+ result = env;
+ }
+ }
+ return result;
+ }
+
+ private static Map handleJsonResponse(InputStream inputStream){
+ Map result = null;
+ try{
+ BufferedReader in = new BufferedReader(new InputStreamReader(inputStream));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+
+ String jsonResponse = response.toString();
+ ObjectMapper objectMapper = new ObjectMapper();
+ result = objectMapper.readValue(jsonResponse, new TypeReference