Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[type:feat] add Logging-Kafka Plugin #5645

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cf7bb1e
[type:feat]add kafka logging e2e test
jakiuncle Aug 26, 2024
1815d72
Merge remote-tracking branch 'origin/master'
jakiuncle Aug 29, 2024
3dec35e
Merge branch 'apache:master' into master
jakiuncle Aug 31, 2024
aca9d87
[type:feat]add kafka logging e2e test
jakiuncle Aug 31, 2024
5c5d28e
Merge remote-tracking branch 'origin/master'
jakiuncle Aug 31, 2024
98a71ae
Merge branch 'master' into master
Aias00 Aug 31, 2024
8513fe1
[type:feat]add kafka logging e2e test
jakiuncle Sep 1, 2024
060ebdb
Merge remote-tracking branch 'origin/master'
jakiuncle Sep 1, 2024
1d21de7
[type:feat]add kafka logging e2e test
jakiuncle Sep 1, 2024
c696c6c
[type:feat]add kafka logging e2e test
jakiuncle Sep 1, 2024
1f63829
[type:feat]add kafka logging e2e test
jakiuncle Sep 1, 2024
e8a2bd4
[type:feat]add kafka logging e2e test
jakiuncle Sep 1, 2024
4a5252c
[type:feat]add kafka logging e2e test
jakiuncle Sep 1, 2024
3c4d9bb
Merge branch 'master' into master
Aias00 Sep 2, 2024
9b077b0
[type:feat]add kafka logging e2e test
jakiuncle Sep 2, 2024
d34cd08
Merge remote-tracking branch 'origin/master'
jakiuncle Sep 2, 2024
5078620
[type:feat]add kafka logging e2e test
jakiuncle Sep 2, 2024
56b5392
[type:feat]add kafka logging e2e test
jakiuncle Sep 2, 2024
779cf79
[type:feat]add kafka logging e2e test
jakiuncle Sep 2, 2024
63037c6
Merge branch 'master' into master
Aias00 Sep 3, 2024
bb57d83
Merge branch 'master' into master
Aias00 Sep 3, 2024
cdf7baf
Merge branch 'master' into master
847850277 Sep 4, 2024
c928a90
Merge branch 'master' into master
Aias00 Sep 5, 2024
73ac165
Merge branch 'master' into master
Aias00 Sep 5, 2024
beaee5e
Merge branch 'master' into master
Aias00 Sep 7, 2024
93a8952
[type:feat]add kafka logging e2e test
jakiuncle Sep 7, 2024
37a03ed
Merge remote-tracking branch 'origin/master'
jakiuncle Sep 7, 2024
744ddf9
[type:feat]add kafka logging e2e test
jakiuncle Sep 8, 2024
04fd14d
Merge branch 'master' into master
Aias00 Sep 9, 2024
91806ca
Merge branch 'master' into master
Aias00 Sep 13, 2024
0c265de
[type:feat]add kafka logging e2e test
jakiuncle Sep 15, 2024
2e7e9e8
Merge branch 'master' of https://github.com/jakiuncle/shenyu
jakiuncle Sep 15, 2024
38e6a24
Merge branch 'master' into master
Aias00 Sep 17, 2024
3b00573
[type:feat]add kafka logging e2e test
jakiuncle Sep 17, 2024
4d0cbfb
Merge branch 'master' into master
Aias00 Sep 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ for sync in ${SYNC_ARRAY[@]}; do

kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml

kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml

sleep 30s
echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
# shellcheck disable=SC2199
Expand Down Expand Up @@ -75,6 +77,8 @@ for sync in ${SYNC_ARRAY[@]}; do
kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml

# shellcheck disable=SC2199
# shellcheck disable=SC2076
if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper
namespace: default
labels:
app: zookeeper
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: wurstmeister/zookeeper:3.4.6
ports:
- containerPort: 2181

apiVersion: v1
kind: Service
metadata:
name: zookeeper
namespace: default
labels:
app: zookeeper
spec:
ports:
- port: 2181
name: client
selector:
app: zookeeper

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka
namespace: default
labels:
app: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: wurstmeister/kafka:2.12-2.1.1
env:
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka:9092
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
ports:
- containerPort: 9092

apiVersion: v1
kind: Service
metadata:
name: kafka
namespace: default
labels:
app: kafka
spec:
ports:
- port: 9092
name: client
protocol: TCP
targetPort: 9092
nodePort: 31877

selector:
app: kafka
5 changes: 5 additions & 0 deletions shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,10 @@
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.google.common.collect.Lists;
import io.restassured.http.Method;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
Expand All @@ -35,8 +39,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
Expand Down Expand Up @@ -75,6 +84,67 @@ private ShenYuScenarioSpec testDivideHello() {
.build();
}

private ShenYuScenarioSpec testKafkaHello(){
return ShenYuScenarioSpec.builder()
.name("testKafkaHello")
.beforeEachSpec(
ShenYuBeforeEachSpec.builder()
.addSelectorAndRule(
newSelectorBuilder("selector",Plugin.LOGGING_KAFKA)
.name("2")
.matchMode(MatchMode.OR)
.conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
.build(),
newRuleBuilder("rule")
.name("2")
.matchMode(MatchMode.OR)
.conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http"))
.build()
)
.checker(exists(TEST))
.build()
)
.caseSpec(
ShenYuCaseSpec.builder()
.add(request -> {
AtomicBoolean isLog = new AtomicBoolean(false);
try {
Thread.sleep(1000 * 30);
request.request(Method.GET, "/http/order/findById?id=23");
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:31877");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
AtomicReference<Boolean> keepCosuming = new AtomicReference<>(true);
Instant start = Instant.now();
while(keepCosuming.get()){
if (Duration.between(start, Instant.now()).toMillis() > 60000) {
keepCosuming.set(false);
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record ->{
String message = record.value();
if (message.contains("/http/order/findById?id=23")) {
isLog.set(true);
keepCosuming.set(false);
}
});
}
Assertions.assertTrue(isLog.get());
} catch (InterruptedException e) {
LOG.info("isLog.get():{}", isLog.get());
LOG.error("error", e);
throw new RuntimeException(e);
}
}).build()
).build();
}

private ShenYuScenarioSpec testRocketMQHello() {
return ShenYuScenarioSpec.builder()
.name("testRocketMQHello")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) thr
formData.add("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
adminClient.changePluginStatus("29", formData);
WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq");
LOG.info("start loggingKafka plugin");
formData.add("id","33");
formData.add("name","loggingKafka");
formData.add("enabled","true");
formData.add("role","Logging");
formData.add("sort","180");
formData.add("config","{\"topic\":\"shenyu-access-logging\",\"namesrvAddr\":\"kafka:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}");
adminClient.changePluginStatus("33",formData);
WaitDataSync.waitGatewayPluginUse(gatewayClient,"org.apache.shenyu.plugin.logging.kafka");

}

@ShenYuScenario(provider = DividePluginCases.class)
Expand Down
Loading