Skip to content

Commit

Permalink
Add routing functionality for incoming http requests and improve Exce…
Browse files Browse the repository at this point in the history
…ption handling
  • Loading branch information
oxsean committed Nov 29, 2023
1 parent a53228d commit f232fb7
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.dubbo.remoting.http12.netty4.h2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2StreamChannel;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
Expand All @@ -31,66 +34,56 @@

import java.util.Set;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2StreamChannel;

public class NettyHttp2ProtocolSelectorHandler extends SimpleChannelInboundHandler<HttpMetadata> {

private Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory;

private final URL url;

private final FrameworkModel frameworkModel;
private Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory;

public NettyHttp2ProtocolSelectorHandler(URL url, FrameworkModel frameworkModel) {
this.url = url;
this.frameworkModel = frameworkModel;
}

public NettyHttp2ProtocolSelectorHandler(
URL url,
FrameworkModel frameworkModel,
Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory) {
URL url,
FrameworkModel frameworkModel,
Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory) {
this.url = url;
this.frameworkModel = frameworkModel;
this.defaultHttp2ServerTransportListenerFactory = defaultHttp2ServerTransportListenerFactory;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) {
HttpHeaders headers = metadata.headers();
String contentType = headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
// 415
if (!StringUtils.hasText(contentType)) {
throw new UnsupportedMediaTypeException(contentType);
}
String contentType = metadata.headers().getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
Http2ServerTransportListenerFactory factory = adaptHttp2ServerTransportListenerFactory(contentType);
if (factory == null) {
throw new UnsupportedMediaTypeException(contentType);
}
H2StreamChannel h2StreamChannel = new NettyH2StreamChannel((Http2StreamChannel) ctx.channel());
HttpWriteQueueHandler writeQueueHandler =
ctx.channel().parent().pipeline().get(HttpWriteQueueHandler.class);
ctx.channel().parent().pipeline().get(HttpWriteQueueHandler.class);
if (writeQueueHandler != null) {
HttpWriteQueue writeQueue = writeQueueHandler.getWriteQueue();
h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue);
}
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast(
new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel)));
new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel)));
pipeline.remove(this);
ctx.fireChannelRead(metadata);
}

private Http2ServerTransportListenerFactory adaptHttp2ServerTransportListenerFactory(String contentType) {
Set<Http2ServerTransportListenerFactory> http2ServerTransportListenerFactories = frameworkModel
if (contentType != null) {
Set<Http2ServerTransportListenerFactory> http2ServerTransportListenerFactories = frameworkModel
.getExtensionLoader(Http2ServerTransportListenerFactory.class)
.getSupportedExtensionInstances();
for (Http2ServerTransportListenerFactory factory : http2ServerTransportListenerFactories) {
if (factory.supportContentType(contentType)) {
return factory;
for (Http2ServerTransportListenerFactory factory : http2ServerTransportListenerFactories) {
if (factory.supportContentType(contentType)) {
return factory;
}
}
}
return defaultHttp2ServerTransportListenerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
import org.apache.dubbo.rpc.protocol.tri.route.PathRouter;
import org.apache.dubbo.rpc.protocol.tri.route.RouteResult;
import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
import org.apache.dubbo.rpc.stub.StubSuppliers;
Expand Down Expand Up @@ -83,6 +85,8 @@ public abstract class AbstractServerTransportListener<HEADER extends RequestMeta

private final List<HeaderFilter> headerFilters;

private final List<PathRouter> pathRouters;

private HttpMessageCodec httpMessageCodec;

private Invoker<?> invoker;
Expand All @@ -99,8 +103,6 @@ public abstract class AbstractServerTransportListener<HEADER extends RequestMeta

private Executor executor;

private boolean hasStub;

private HttpMessageListener httpMessageListener;

public AbstractServerTransportListener(FrameworkModel frameworkModel, URL url, HttpChannel httpChannel) {
Expand All @@ -111,6 +113,8 @@ public AbstractServerTransportListener(FrameworkModel frameworkModel, URL url, H
frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
this.headerFilters =
frameworkModel.getExtensionLoader(HeaderFilter.class).getActivateExtension(url, HEADER_FILTER_KEY);
this.pathRouters =
frameworkModel.getExtensionLoader(PathRouter.class).getActivateExtensions();
}

protected Executor initializeExecutor(HEADER metadata) {
Expand Down Expand Up @@ -144,34 +148,34 @@ public void onMetadata(HEADER metadata) {

protected void doOnMetadata(HEADER metadata) {
onPrepareMetadata(metadata);
this.httpMetadata = metadata;

String method = metadata.method();
String path = metadata.path();
HttpHeaders headers = metadata.headers();
// 1.check necessary header
String contentType = headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
if (contentType == null) {
throw new UnsupportedMediaTypeException(
"'" + HttpHeaderNames.CONTENT_TYPE.getName() + "' must be not null.");
}

// 2. check service
String[] parts = path.split("/");
if (parts.length != 3) {
throw new IllegalPathException(path);
RouteResult result = null;
for (PathRouter router : pathRouters) {
result = router.route(method, path, contentType, headers);
if (result != null) {
break;
}
}
String serviceName = parts[1];
this.hasStub = pathResolver.hasNativeStub(path);
this.invoker = getInvoker(metadata, serviceName);
if (invoker == null) {
throw new UnimplementedException(serviceName);
if (result == null) {
throw new HttpStatusException(404, "Invoker not found");
}
HttpMessageCodec httpMessageCodec = determineHttpMessageCodec(contentType);
HttpMessageCodec httpMessageCodec = determineHttpMessageCodec(result.getContentType());
if (httpMessageCodec == null) {
throw new UnsupportedMediaTypeException(contentType);
}

this.httpMetadata = metadata;
this.invoker = result.getInvoker();
this.httpMessageCodec = httpMessageCodec;
setServiceDescriptor(findServiceDescriptor(invoker, serviceName, hasStub));
setServiceDescriptor(result.getServiceDescriptor());
setMethodDescriptor(result.getMethodDescriptor());
setHttpMessageListener(newHttpMessageListener());

onMetadataCompletion(metadata);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.protocol.tri.route;

public interface PathMappingRegistry {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.protocol.tri.route;

import org.apache.dubbo.remoting.http12.HttpHeaders;

public interface PathRouter {

RouteResult route(String method, String path, String contentType, HttpHeaders headers);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.rpc.protocol.tri.route;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;

/**
* Route result
*
* @since 1.0
*/
public final class RouteResult {
/**
* The invoker.
*/
private final Invoker<?> invoker;

private final ServiceDescriptor serviceDescriptor;

private final MethodDescriptor methodDescriptor;

/**
* The content type.
*/
private final String contentType;

/**
* The response content type.
*/
private final String responseContentType;

public RouteResult(Invoker<?> invoker, ServiceDescriptor serviceDescriptor, MethodDescriptor methodDescriptor, String contentType, String responseContentType) {
this.invoker = invoker;
this.serviceDescriptor = serviceDescriptor;
this.methodDescriptor = methodDescriptor;
this.contentType = contentType;
this.responseContentType = responseContentType;
}

public Invoker<?> getInvoker() {
return invoker;
}

public ServiceDescriptor getServiceDescriptor() {
return serviceDescriptor;
}

public MethodDescriptor getMethodDescriptor() {
return methodDescriptor;
}

public String getContentType() {
return contentType;
}

public String getResponseContentType() {
return responseContentType;
}
}

0 comments on commit f232fb7

Please sign in to comment.