Skip to content

Commit

Permalink
feat:Kafka eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenzo-ingenito committed Nov 26, 2024
1 parent eb11e5b commit 3f00ea9
Show file tree
Hide file tree
Showing 8 changed files with 589 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*
* Copyright (C) 2023 Ministero della Salute
*
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.health;

import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafkaHealthIndicator implements HealthIndicator {

@Autowired
private AdminClient client;

@Override
public Health health() {
Health health = null;
try {
client.listTopics().listings().get();
health = Health.up().build();
} catch (InterruptedException e) {
log.warn("Interrupted!", e);
health = Health.down(e).build();
// Restore interrupted state...
Thread.currentThread().interrupt();
} catch (Exception e) {
health = Health.down(e).build();
}
return health;
}
}
///*
// * SPDX-License-Identifier: AGPL-3.0-or-later
// *
// * Copyright (C) 2023 Ministero della Salute
// *
// * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
// *
// * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
// *
// * You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
// */
//package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.health;
//
//import org.apache.kafka.clients.admin.AdminClient;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.actuate.health.Health;
//import org.springframework.boot.actuate.health.HealthIndicator;
//import org.springframework.stereotype.Component;
//
//import lombok.extern.slf4j.Slf4j;
//
//@Component
//@Slf4j
//public class KafkaHealthIndicator implements HealthIndicator {
//
// @Autowired
// private AdminClient client;
//
// @Override
// public Health health() {
// Health health = null;
// try {
// client.listTopics().listings().get();
// health = Health.up().build();
// } catch (InterruptedException e) {
// log.warn("Interrupted!", e);
// health = Health.down(e).build();
// // Restore interrupted state...
// Thread.currentThread().interrupt();
// } catch (Exception e) {
// health = Health.down(e).build();
// }
// return health;
// }
//}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public Map<String, Object> producerConfigs() {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, id + "-" + kafkaProducerPropCFG.getTransactionalId());
props.put(ProducerConfig.ACKS_CONFIG,kafkaProducerPropCFG.getAck());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,kafkaProducerPropCFG.getIdempotence());

props.put("sasl.login.callback.handler.class","it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2.OauthAuthenticateLoginCallbackHandler");
props.put("sasl.client.callback.handler.class","it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2.OauthAuthenticateValidatorCallbackHandler");


if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) {
props.put("security.protocol", kafkaPropCFG.getProtocol());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ public class KafkaPropertiesCFG {
@Autowired
private ProfileUtility profileUtility;

@Bean
public AdminClient client() {
Properties configProperties = new Properties();
configProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
if(!profileUtility.isDevOrDockerProfile() && !profileUtility.isTestProfile()) {
configProperties.put("security.protocol", protocol);
configProperties.put("sasl.mechanism", mechanism);
configProperties.put("sasl.jaas.config", configJaas);
configProperties.put("ssl.truststore.location", trustoreLocation);
configProperties.put("ssl.truststore.password", String.valueOf(trustorePassword));
}
return AdminClient.create(configProperties);
}
// @Bean
// public AdminClient client() {
// Properties configProperties = new Properties();
// configProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
// if(!profileUtility.isDevOrDockerProfile() && !profileUtility.isTestProfile()) {
// configProperties.put("security.protocol", protocol);
// configProperties.put("sasl.mechanism", mechanism);
// configProperties.put("sasl.jaas.config", configJaas);
// configProperties.put("ssl.truststore.location", trustoreLocation);
// configProperties.put("ssl.truststore.password", String.valueOf(trustorePassword));
// }
// return AdminClient.create(configProperties);
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
private final Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.class);
private Map<String, String> moduleOptions = null;
private boolean configured = false;

@Override
public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
throw new IllegalArgumentException(
String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
jaasConfigEntries.size()));
this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
configured = true;
}

public boolean isConfigured(){
return this.configured;
}

@Override
public void close() {
}

@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
if (!isConfigured())
throw new IllegalStateException("Callback handler not configured");
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback)
try {
handleCallback((OAuthBearerTokenCallback) callback);
} catch (KafkaException e) {
throw new IOException(e.getMessage(), e);
}
else
throw new UnsupportedCallbackException(callback);
}
}

private void handleCallback(OAuthBearerTokenCallback callback){
if (callback.token() != null)
throw new IllegalArgumentException("Callback had a token already");

log.info("Try to acquire token!");
// OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer("YnJva2VyLWthZmthOmJyb2tlci1rYWZrYQ==");
OauthBearerTokenJwt token = OauthHttpCalls.login("broker-kafka");
log.info("Retrieved token..");
if(token == null){
throw new IllegalArgumentException("Null token returned from server");
}
callback.token(token);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerValidationResult;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
private final Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.class);
private List<AppConfigurationEntry> jaasConfigEntries;
private Map<String, String> moduleOptions = null;
private boolean configured = false;
private Time time = Time.SYSTEM;

@Override
public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
throw new IllegalArgumentException(
String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
jaasConfigEntries.size()));
this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
configured = true;
}

public boolean isConfigured(){
return this.configured;
}

@Override
public void close() {
}

@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
if (!isConfigured())
throw new IllegalStateException("Callback handler not configured");
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerValidatorCallback)
try {
OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
handleCallback(validationCallback);
} catch (KafkaException e) {
throw new IOException(e.getMessage(), e);
}
else
throw new UnsupportedCallbackException(callback);
}
}

private void handleCallback(OAuthBearerValidatorCallback callback){
String accessToken = callback.tokenValue();
if (accessToken == null)
throw new IllegalArgumentException("Callback missing required token value");

log.info("Trying to introspect Token!");
OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
log.info("Trying to introspected");

// Implement Check Expire Token..
long now = time.milliseconds();
if(now > token.expirationTime()){
OAuthBearerValidationResult.newFailure("Expired Token, needs refresh!");
}

log.info("Validated! token..");
callback.token(token);
}

}
Loading

0 comments on commit 3f00ea9

Please sign in to comment.