Skip to content

Commit

Permalink
Create server support https (#3152)
Browse files Browse the repository at this point in the history
* add the https server in the receiver verticle

* add the https server support in the receiver verticle factory

* midway-backup

* update the Vert.x https server config

* fix the format for Main.java

* modify the yaml file to update the keystore secret

* update the secret name

* add the port support for https

* Instead of using magic number, we use env var for TLS ingress port

* update the pem key

* update the name

* fix the failed tests

* fix the format issue

* update the condition check

* Update data-plane/config/broker/500-receiver.yaml

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* export the Ingress_TLS_port

* provide config for channel and sink receiver

* increase the code coverage

* add the coverage tests (not completed yet)

* update the name of the secret volumn for sink, broker and channel.

* update the mountedVolume path name

* update the Java unit test

* change back the image path to use variable

* add "readonly" to the broker secret volume

* fix the format and remove redundant code

* Change the name of the secret volume

* Remove unused code

* add the https options in the verticle tracing test

* Use final static variable

* format fix

* fix formatting issue

* Fix the comments: remove redundant code

* change the final static variables

* format fix: remove the new lines

* format fixing

---------

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
Leo6Leo and pierDipi authored Jun 22, 2023
1 parent 93b0ecb commit 5a236cb
Show file tree
Hide file tree
Showing 15 changed files with 625 additions and 440 deletions.
18 changes: 18 additions & 0 deletions data-plane/config/broker/500-receiver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,19 @@ spec:
- mountPath: /etc/tracing
name: config-tracing
readOnly: true
- mountPath: /etc/receiver-tls-secret
name: broker-receiver-tls-secret
readOnly: true
ports:
- containerPort: 9090
name: http-metrics
protocol: TCP
- containerPort: 8080
name: http
protocol: TCP
- containerPort: 8443
name: https
protocol: TCP
env:
- name: SERVICE_NAME
value: "kafka-broker-receiver"
Expand All @@ -92,6 +98,8 @@ spec:
fieldPath: metadata.namespace
- name: INGRESS_PORT
value: "8080"
- name: INGRESS_TLS_PORT
value: "8443"
- name: PRODUCER_CONFIG_FILE_PATH
value: /etc/config/config-kafka-broker-producer.properties
- name: HTTPSERVER_CONFIG_FILE_PATH
Expand Down Expand Up @@ -169,6 +177,12 @@ spec:
- name: config-tracing
configMap:
name: config-tracing
- name: broker-receiver-tls-secret
secret:
secretName: kafka-broker-ingress-server-tls
optional: true


restartPolicy: Always
---

Expand All @@ -190,6 +204,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: https
port: 443
protocol: TCP
targetPort: 8443
- name: http-container
port: 8080
protocol: TCP
Expand Down
16 changes: 16 additions & 0 deletions data-plane/config/channel/500-receiver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,19 @@ spec:
- mountPath: /etc/tracing
name: config-tracing
readOnly: true
- mountPath: /etc/receiver-tls-secret
name: channel-receiver-tls-secret
readOnly: true
ports:
- containerPort: 9090
name: http-metrics
protocol: TCP
- containerPort: 8080
name: http
protocol: TCP
- containerPort: 8443
name: https
protocol: TCP
env:
- name: SERVICE_NAME
value: "kafka-channel-receiver"
Expand All @@ -92,6 +98,8 @@ spec:
fieldPath: metadata.namespace
- name: INGRESS_PORT
value: "8080"
- name: INGRESS_TLS_PORT
value: "8443"
- name: PRODUCER_CONFIG_FILE_PATH
value: /etc/config/config-kafka-channel-producer.properties
- name: HTTPSERVER_CONFIG_FILE_PATH
Expand Down Expand Up @@ -169,6 +177,10 @@ spec:
- name: config-tracing
configMap:
name: config-tracing
- name: channel-receiver-tls-secret
secret:
secretName: kafka-channel-ingress-server-tls
optional: true
restartPolicy: Always
---

Expand All @@ -190,6 +202,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: https
port: 443
protocol: TCP
targetPort: 8443
- name: http-container
port: 8080
protocol: TCP
Expand Down
16 changes: 16 additions & 0 deletions data-plane/config/sink/500-receiver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,19 @@ spec:
- mountPath: /etc/tracing
name: config-tracing
readOnly: true
- mountPath: /etc/receiver-tls-secret
name: sink-receiver-tls-secret
readOnly: true
ports:
- containerPort: 9090
name: http-metrics
protocol: TCP
- containerPort: 8080
name: http
protocol: TCP
- containerPort: 8443
name: https
protocol: TCP
env:
- name: SERVICE_NAME
value: "kafka-sink-receiver"
Expand All @@ -92,6 +98,8 @@ spec:
fieldPath: metadata.namespace
- name: INGRESS_PORT
value: "8080"
- name: INGRESS_TLS_PORT
value: "8443"
- name: PRODUCER_CONFIG_FILE_PATH
value: /etc/config/config-kafka-sink-producer.properties
- name: HTTPSERVER_CONFIG_FILE_PATH
Expand Down Expand Up @@ -169,6 +177,10 @@ spec:
- name: config-tracing
configMap:
name: config-tracing
- name: sink-receiver-tls-secret
secret:
secretName: kafka-sink-ingress-server-tls
optional: true
restartPolicy: Always
---

Expand All @@ -194,6 +206,10 @@ spec:
port: 8080
protocol: TCP
targetPort: 8080
- name: https
port: 443
protocol: TCP
targetPort: 8443
- name: http-metrics
port: 9090
protocol: TCP
Expand Down
2 changes: 2 additions & 0 deletions data-plane/profiler/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export WAIT_STARTUP_SECONDS="8"
export SERVICE_NAME="kafka-broker-receiver"
export SERVICE_NAMESPACE="knative-eventing"
export INGRESS_PORT="8080"
export INGRESS_TLS_PORT="8443"
export METRICS_PORT="9098"
export INSTANCE_ID="receiver"

Expand All @@ -116,6 +117,7 @@ receiver_pid=$!
export SERVICE_NAME="kafka-broker-dispatcher"
export SERVICE_NAMESPACE="knative-eventing"
export INGRESS_PORT="8080"
export INGRESS_TLS_PORT="8443"
export METRICS_PORT="9099"
export INSTANCE_ID="dispatcher"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.Function;
import java.io.File;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
import static dev.knative.eventing.kafka.broker.receiver.impl.handler.ControlPlaneProbeRequestUtil.PROBE_HASH_HEADER_NAME;
Expand All @@ -47,73 +51,117 @@
/**
* This verticle is responsible for implementing the logic of the receiver.
* <p>
* The receiver is the component responsible for mapping incoming {@link io.cloudevents.CloudEvent} requests to specific Kafka topics.
* The receiver is the component responsible for mapping incoming
* {@link io.cloudevents.CloudEvent} requests to specific Kafka topics.
* In order to do so, this component:
* <ul>
* <li>Starts an {@link HttpServer} listening for incoming events</li>
* <li>Starts a {@link ResourcesReconciler}, listen on the event bus for reconciliation events and keeps track of the {@link dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Ingress} objects and their {@code path => (topic, producer)} mapping</li>
* <li>Implements a request handler that invokes a series of {@code preHandlers} (which are assumed to complete synchronously) and then a final {@link IngressRequestHandler} to publish the record to Kafka</li>
* <li>Starts two {@link HttpServer}, one with http, and one with https,
* listening for incoming events</li>
* <li>Starts a {@link ResourcesReconciler}, listen on the event bus for
* reconciliation events and keeps track of the
* {@link dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Ingress}
* objects and their {@code path => (topic, producer)} mapping</li>
* <li>Implements a request handler that invokes a series of {@code preHandlers}
* (which are assumed to complete synchronously) and then a final
* {@link IngressRequestHandler} to publish the record to Kafka</li>
* </ul>
*/
public class ReceiverVerticle extends AbstractVerticle implements Handler<HttpServerRequest> {

private static final Logger logger = LoggerFactory.getLogger(ReceiverVerticle.class);
private static final String SECRET_VOLUME_PATH = "/etc/receiver-secret-volume";
private static final String TLS_KEY_FILE_PATH = SECRET_VOLUME_PATH + "/tls.key";
private static final String TLS_CRT_FILE_PATH = SECRET_VOLUME_PATH + "/tls.crt";

private final HttpServerOptions httpServerOptions;
private final HttpServerOptions httpsServerOptions;
private final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory;
private final IngressRequestHandler ingressRequestHandler;
private final ReceiverEnv env;

private HttpServer server;
private HttpServer httpServer;
private HttpServer httpsServer;
private MessageConsumer<Object> messageConsumer;
private IngressProducerReconcilableStore ingressProducerStore;

public ReceiverVerticle(final ReceiverEnv env,
final HttpServerOptions httpServerOptions,
final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory,
final IngressRequestHandler ingressRequestHandler) {
final HttpServerOptions httpServerOptions,
final HttpServerOptions httpsServerOptions,
final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory,
final IngressRequestHandler ingressRequestHandler) {
Objects.requireNonNull(env);
Objects.requireNonNull(httpServerOptions);
Objects.requireNonNull(httpsServerOptions);
Objects.requireNonNull(ingressProducerStoreFactory);
Objects.requireNonNull(ingressRequestHandler);

this.env = env;
this.httpServerOptions = httpServerOptions != null ? httpServerOptions : new HttpServerOptions();
this.httpsServerOptions = httpsServerOptions;
this.ingressProducerStoreFactory = ingressProducerStoreFactory;
this.ingressRequestHandler = ingressRequestHandler;
}


@Override
public void start(final Promise<Void> startPromise) {
this.ingressProducerStore = this.ingressProducerStoreFactory.apply(vertx);
this.messageConsumer = ResourcesReconciler
.builder()
.watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler))
.buildAndListen(vertx);

this.server = vertx.createHttpServer(httpServerOptions);
.builder()
.watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler))
.buildAndListen(vertx);

this.httpServer = vertx.createHttpServer(this.httpServerOptions);

// check whether the secret volume is mounted
File secretVolume = new File(SECRET_VOLUME_PATH);
if (secretVolume.exists()) {
// The secret volume is mounted, we should start the https server
// check whether the tls.key and tls.crt files exist
File tlsKeyFile = new File(TLS_KEY_FILE_PATH);
File tlsCrtFile = new File(TLS_CRT_FILE_PATH);

if (tlsKeyFile.exists() && tlsCrtFile.exists() && httpsServerOptions != null) {
PemKeyCertOptions keyCertOptions = new PemKeyCertOptions()
.setKeyPath(TLS_KEY_FILE_PATH)
.setCertPath(TLS_CRT_FILE_PATH);
this.httpsServerOptions
.setSsl(true)
.setPemKeyCertOptions(keyCertOptions);

this.httpsServer = vertx.createHttpServer(this.httpsServerOptions);
}
}

final var handler = new ProbeHandler(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
new MethodNotAllowedHandler(this)
);

this.server.requestHandler(handler)
.exceptionHandler(startPromise::tryFail)
.listen(httpServerOptions.getPort(), httpServerOptions.getHost())
.<Void>mapEmpty()
.onComplete(startPromise);
env.getLivenessProbePath(),
env.getReadinessProbePath(),
new MethodNotAllowedHandler(this));

if (this.httpsServer != null) {
CompositeFuture.all(
this.httpServer.requestHandler(handler)
.exceptionHandler(startPromise::tryFail)
.listen(this.httpServerOptions.getPort(), this.httpServerOptions.getHost()),

this.httpsServer.requestHandler(handler)
.exceptionHandler(startPromise::tryFail)
.listen(this.httpsServerOptions.getPort(), this.httpsServerOptions.getHost()))
.<Void>mapEmpty().onComplete(startPromise);
} else {
this.httpServer.requestHandler(handler)
.exceptionHandler(startPromise::tryFail)
.listen(this.httpServerOptions.getPort(), this.httpServerOptions.getHost())
.<Void>mapEmpty().onComplete(startPromise);
}
}

@Override
public void stop(Promise<Void> stopPromise) {
CompositeFuture.all(
server.close().mapEmpty(),
messageConsumer.unregister()
)
.<Void>mapEmpty()
.onComplete(stopPromise);
(this.httpServer != null ? this.httpServer.close().mapEmpty() : Future.succeededFuture()),
(this.httpsServer != null ? this.httpsServer.close().mapEmpty() : Future.succeededFuture()),
(this.messageConsumer != null ? this.messageConsumer.unregister() : Future.succeededFuture())).<Void>mapEmpty()
.onComplete(stopPromise);
}

@Override
Expand All @@ -126,17 +174,16 @@ public void handle(HttpServerRequest request) {
if (producer == null) {
request.response().setStatusCode(NOT_FOUND.code()).end();
logger.warn("Resource not found {} {} {}",
keyValue("path", request.path()),
keyValue("host", request.host()),
keyValue("hostHeader", request.getHeader("Host"))
);
keyValue("path", request.path()),
keyValue("host", request.host()),
keyValue("hostHeader", request.getHeader("Host")));
return;
}

if (isControlPlaneProbeRequest(request)) {
request.response()
.putHeader(PROBE_HASH_HEADER_NAME, request.getHeader(PROBE_HASH_HEADER_NAME))
.setStatusCode(OK.code()).end();
.putHeader(PROBE_HASH_HEADER_NAME, request.getHeader(PROBE_HASH_HEADER_NAME))
.setStatusCode(OK.code()).end();
return;
}

Expand Down
Loading

0 comments on commit 5a236cb

Please sign in to comment.