diff --git a/front-end/package.json b/front-end/package.json index 2e912c83..08a03ea5 100644 --- a/front-end/package.json +++ b/front-end/package.json @@ -49,6 +49,7 @@ "mockjs": "1.0.1-beta3", "normalize.css": "7.0.0", "nprogress": "0.2.0", + "qs": "^6.12.1", "showdown": "1.9.1", "sortablejs": "1.7.0", "vue": "2.6.0", diff --git a/front-end/src/utils/request.js b/front-end/src/utils/request.js index 1a55e734..5a21e670 100644 --- a/front-end/src/utils/request.js +++ b/front-end/src/utils/request.js @@ -21,11 +21,15 @@ import { getEnvironment } from '@/utils/environment' import { getTenant } from '@/utils/tenant' import router from '../router' import { getCsrfToken } from '@/utils/csrfToken' +import qs from "qs"; // create an axios instance const service = axios.create({ baseURL: process.env.BASE_API, // api 的 base_url - timeout: 60000 // request timeout + timeout: 60000, // request timeout + paramsSerializer: function(params) { + return qs.stringify(params, { arrayFormat: 'repeat' }) + } }) // request interceptor diff --git a/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java index 0afdb6f4..da8fe52f 100644 --- a/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java +++ b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java @@ -24,7 +24,8 @@ import org.apache.pulsar.client.admin.Topics; public interface PulsarAdminService { - PulsarAdmin getPulsarAdmin(String url); + PulsarAdmin getPulsarAdmin(String url, String env, String token); + BrokerStats brokerStats(String url, String env); BrokerStats brokerStats(String url); Clusters clusters(String url); Clusters clusters(String url, String token); diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java index ae0f0836..ab85bbe8 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java @@ -54,6 +54,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; +import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; @Service @@ -122,24 +123,24 @@ private void scheduleCollectStats() { List<EnvironmentEntity> environmentEntities = environmentsRepository.getAllEnvironments(); Map<Pair<String, String>, String> collectStatsServiceUrls = new HashMap<>(); for (EnvironmentEntity env : environmentEntities) { - String serviceUrl = checkServiceUrl(null, env.getBroker()); + String brokerUrl = env.getBroker(); Map<String, Object> clusterObject = - clustersService.getClustersList(0, 0, serviceUrl, (c) -> serviceUrl); + clustersService.getClustersList(0, 0, brokerUrl, (c) -> brokerUrl); List<HashMap<String, Object>> clusterLists = (List<HashMap<String, Object>>) clusterObject.get("data"); clusterLists.forEach((clusterMap) -> { String cluster = (String) clusterMap.get("cluster"); Pair<String, String> envCluster = Pair.of(env.getName(), cluster); + log.debug(envCluster.toString()); + String serviceUrlTls = (String) clusterMap.get("serviceUrlTls"); - tlsEnabled = tlsEnabled && StringUtils.isNotBlank(serviceUrlTls); - String webServiceUrl = tlsEnabled ? serviceUrlTls : (String) clusterMap.get("serviceUrl"); + String serviceUrl = (String) clusterMap.get("serviceUrl"); + + String webServiceUrl = StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl; if (webServiceUrl.contains(",")) { String[] webServiceUrlList = webServiceUrl.split(","); for (String url : webServiceUrlList) { - // making sure the protocol is appended in case the env was added without the protocol - if (!tlsEnabled && !url.contains("http://")) { - url = (tlsEnabled ? "https://" : "http://") + url; - } + try { Brokers brokers = pulsarAdminService.brokers(url); brokers.healthcheck(); @@ -150,14 +151,10 @@ private void scheduleCollectStats() { } } } - collectStatsServiceUrls.put(envCluster, webServiceUrl); + log.info("Start collecting stats from env {} / cluster {} @ {}", envCluster.getLeft(), envCluster.getRight(), serviceUrl); + collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), webServiceUrl); }); } - collectStatsServiceUrls.forEach((envCluster, serviceUrl) -> { - log.info("Start collecting stats from env {} / cluster {} @ {}", - envCluster.getLeft(), envCluster.getRight(), serviceUrl); - collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), serviceUrl); - }); log.info("Start clearing stats from broker"); clearStats(unixTime, clearStatsInterval / 1000); @@ -168,18 +165,21 @@ public void collectStatsToDB(long unixTime, String env, String cluster, String s List<HashMap<String, Object>> brokerLists = (List<HashMap<String, Object>>) brokerObject.get("data"); brokerLists.forEach((brokerMap) -> { // returns [Broker Hostname]:[Broker non Tls port] - String tempBroker = (String) brokerMap.get("broker"); - //default to http - String broker = "http://" + tempBroker; - // if tls enabled the protocol and port is extracted from service url - if (tlsEnabled && tempBroker.contains(":")) { - String brokerHost = tempBroker.substring(0, tempBroker.indexOf(":")); - UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(serviceUrl); - broker = builder.host(brokerHost).toUriString(); - } + String broker = (String) brokerMap.get("broker"); + log.info("processing broker: {}", broker); + + // use web service url scheme to replace host part with broker + UriComponents serviceURI = UriComponentsBuilder.fromHttpUrl(serviceUrl).build(); + UriComponentsBuilder builder = UriComponentsBuilder.newInstance() + .scheme(serviceURI.getScheme()) + .host(broker.split(":")[0]) + .port(serviceURI.getPort()); + String finalBroker = builder.toUriString(); + JsonObject result; try { - result = pulsarAdminService.brokerStats(broker).getTopics(); + log.info("Start collecting stats from broker {}", finalBroker); + result = pulsarAdminService.brokerStats(finalBroker, env).getTopics(); } catch(PulsarAdminException e) { log.error("Failed to get broker metrics.", e); return; @@ -197,7 +197,7 @@ public void collectStatsToDB(long unixTime, String env, String cluster, String s String[] topicPath = this.parseTopic(topic); topicStatsEntity.setEnvironment(env); topicStatsEntity.setCluster(cluster); - topicStatsEntity.setBroker(tempBroker); + topicStatsEntity.setBroker(finalBroker); topicStatsEntity.setTenant(topicPath[0]); topicStatsEntity.setNamespace(topicPath[1]); topicStatsEntity.setBundle(bundle); @@ -309,10 +309,6 @@ public static String checkServiceUrl(String serviceUrl, String requestHost) { if (serviceUrl == null || serviceUrl.length() <= 0) { serviceUrl = requestHost; } - - if (!serviceUrl.startsWith("http")) { - serviceUrl = "http://" + serviceUrl; - } return serviceUrl; } diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java index c75e267b..17c87e3d 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java @@ -114,9 +114,6 @@ private String pickOneServiceUrl(String webServiceUrl) { String[] webServiceUrlList = webServiceUrl.split(","); int index = ThreadLocalRandom.current().nextInt(0, webServiceUrlList.length); String url = webServiceUrlList[index]; - if (!url.contains("http://")) { - url = "http://" + url; - } log.info("pick web url:{}", url); return url; } @@ -141,7 +138,8 @@ private String getServiceUrl(String environment, String cluster, int numReloads) throw new RuntimeException( "No cluster '" + cluster + "' found in environment '" + environment + "'"); } - return tlsEnabled && StringUtils.isNotBlank(clusterData.getServiceUrlTls()) ? clusterData.getServiceUrlTls() : clusterData.getServiceUrl(); + + return StringUtils.isNotBlank(clusterData.getServiceUrlTls()) ? clusterData.getServiceUrlTls() : clusterData.getServiceUrl(); } @Scheduled( diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java index c269dfa4..da9ad9f9 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java @@ -79,19 +79,25 @@ public void destroy() { pulsarAdmins.values().forEach(value -> value.close()); } - public synchronized PulsarAdmin getPulsarAdmin(String url) { - if (!pulsarAdmins.containsKey(url)) { - pulsarAdmins.put(url, this.createPulsarAdmin(url, null)); - } - return pulsarAdmins.get(url); + + public PulsarAdmin getPulsarAdmin(String url) { + return this.createPulsarAdmin(url, null, null); } public PulsarAdmin getPulsarAdmin(String url, String token) { - return this.createPulsarAdmin(url, token); + return this.createPulsarAdmin(url, null, token); + } + + public PulsarAdmin getPulsarAdmin(String url, String env, String token) { + return this.createPulsarAdmin(url, env, token); } public BrokerStats brokerStats(String url) { - return getPulsarAdmin(url).brokerStats(); + return getPulsarAdmin(url, null, null).brokerStats(); + } + + public BrokerStats brokerStats(String url, String env) { + return getPulsarAdmin(url, env, null).brokerStats(); } public Clusters clusters(String url) { @@ -149,24 +155,23 @@ public Map<String, String> getAuthHeader(String url) { return result; } - private String getEnvironmentToken(String url) { + private String getEnvironmentToken(String url, String env) { Optional<EnvironmentEntity> optionalEnvironmentEntity = environmentsRepository.findByBroker(url); if (optionalEnvironmentEntity.isPresent()) { return optionalEnvironmentEntity.get().getToken(); } - String environment = environmentCacheService.getEnvironment(url); - Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(environment); + Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(env); return environmentEntityOptional.map(EnvironmentEntity::getToken).orElse(null); } - private PulsarAdmin createPulsarAdmin(String url, String token) { + private PulsarAdmin createPulsarAdmin(String url, String env, String token) { try { log.info("Create Pulsar Admin instance. url={}, authPlugin={}, authParams={}, tlsAllowInsecureConnection={}, tlsTrustCertsFilePath={}, tlsEnableHostnameVerification={}", url, authPlugin, authParams, tlsAllowInsecureConnection, tlsTrustCertsFilePath, tlsEnableHostnameVerification); PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder(); pulsarAdminBuilder.serviceHttpUrl(url); if (null == token) { - token = getEnvironmentToken(url); + token = getEnvironmentToken(url, env); } if (StringUtils.isNotBlank(token)) { pulsarAdminBuilder.authentication(AuthenticationFactory.token(token)); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 80828a28..1ae3a966 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -59,6 +59,11 @@ spring.datasource.initialization-mode=always #spring.datasource.username=postgres #spring.datasource.password=postgres +# hikari configuration +spring.datasource.hikari.connectionTimeout=10000 +spring.datasource.hikari.idleTimeout=60000 +spring.datasource.hikari.maxLifetime=300000 + # zuul config # https://cloud.spring.io/spring-cloud-static/Dalston.SR5/multi/multi__router_and_filter_zuul.html # By Default Zuul adds Authorization to be dropped headers list. Below we are manually setting it diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java index a15e3129..2aab6250 100644 --- a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java +++ b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java @@ -234,7 +234,7 @@ public void convertStatsToDbTest() throws Exception { brokersMap.put("data", brokersArray); Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl)) .thenReturn(brokersMap); - Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats); + Mockito.when(pulsarAdminService.brokerStats(serviceUrl, environment)).thenReturn(stats); JsonObject data = new Gson().fromJson(testData, JsonObject.class); Mockito.when(stats.getTopics()) .thenReturn(data); @@ -310,7 +310,7 @@ public void findByMultiTenantOrMultiNamespace() throws Exception { brokersMap.put("data", brokersArray); Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl)) .thenReturn(brokersMap); - Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats); + Mockito.when(pulsarAdminService.brokerStats(serviceUrl, environment)).thenReturn(stats); JsonObject data = new Gson().fromJson(testData, JsonObject.class); Mockito.when(stats.getTopics()) .thenReturn(data); diff --git a/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java index a819e866..4600a08c 100644 --- a/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java +++ b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java @@ -50,7 +50,7 @@ public void teardown() { @Test public void getPulsarAdminTest() { - String serviceUrl = pulsarAdminService.getPulsarAdmin("http://localhost:8080").getServiceUrl(); + String serviceUrl = pulsarAdminService.getPulsarAdmin("http://localhost:8080", null, null).getServiceUrl(); Assert.assertEquals("http://localhost:8080", serviceUrl); }