Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
oxsean committed Dec 18, 2023
1 parent 6118aa5 commit 1584e3b
Show file tree
Hide file tree
Showing 21 changed files with 460 additions and 1,156 deletions.
37 changes: 34 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,43 @@ root = true
[*]
charset = utf-8
end_of_line = lf
indent_size = 4
indent_style = space
tab_width = 4
max_line_length = 200
insert_final_newline = true
trim_trailing_whitespace = true

# 4 space indentation
[*.{java,xml}]
indent_style = space
indent_size = 4
[*.java]
ij_java_continuation_indent_size = 8
ij_java_align_multiline_deconstruction_list_components = false
ij_java_align_multiline_parameters = false
ij_java_call_parameters_new_line_after_left_paren = true
ij_java_call_parameters_wrap = normal
ij_java_keep_control_statement_in_one_line = false
ij_java_keep_first_column_comment = false
ij_java_keep_line_breaks = false
ij_java_method_call_chain_wrap = normal
ij_java_method_parameters_new_line_after_left_paren = true
ij_java_rparen_on_new_line_in_deconstruction_pattern = false
ij_java_wrap_first_method_in_call_chain = true
ij_java_wrap_long_lines = true
ij_java_class_count_to_use_import_on_demand = 999
ij_java_names_count_to_use_import_on_demand = 999
ij_java_insert_inner_class_imports = true
ij_java_for_brace_force = always
ij_java_if_brace_force = always

[*.json]
tab_width = 2

[*.{yml,yaml}]
indent_size = 2

[*.xml]
ij_xml_attribute_wrap = off
ij_xml_keep_blank_lines = 1

[pom.xml]
indent_size = 2
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Stream utils.
Expand Down Expand Up @@ -229,4 +230,13 @@ public static void skipUnusedStream(InputStream is) throws IOException {
is.skip(is.available());
}
}

public static void copy(InputStream in, OutputStream out) throws IOException {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
}
out.flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -420,4 +422,41 @@ public static <T> Set<T> toTreeSet(Set<T> set) {
}
return set;
}

public static <T> Set<T> newHashSet(int expectedSize) {
return new HashSet<>(capacity(expectedSize));
}

public static <T> Set<T> newLinkedHashSet(int expectedSize) {
return new LinkedHashSet<>(capacity(expectedSize));
}

public static <T> Set<T> newConcurrentHashSet(int expectedSize) {
return Collections.newSetFromMap(newConcurrentHashMap(expectedSize));
}

public static <T, K> Map<K, T> newHashMap(int expectedSize) {
return new HashMap<>(capacity(expectedSize));
}

public static <T, K> Map<K, T> newLinkedHashMap(int expectedSize) {
return new LinkedHashMap<>(capacity(expectedSize));
}

public static <T, K> Map<K, T> newConcurrentHashMap(int expectedSize) {
return new ConcurrentHashMap<>(capacity(expectedSize));
}

public static int capacity(int expectedSize) {
if (expectedSize < 3) {
if (expectedSize < 0) {
throw new IllegalArgumentException("expectedSize cannot be negative but was: " + expectedSize);
}
return expectedSize + 1;
}
if (expectedSize < 1 << (Integer.SIZE - 2)) {
return (int) (expectedSize / 0.75F + 1.0F);
}
return Integer.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12;

public interface HttpExchange {

HttpRequest getRequest();

HttpResponse getResponse();

void setRequest(HttpRequest request);

void setResponse(HttpResponse response);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12;

import org.apache.dubbo.common.extension.ExtensionScope;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;

@SPI(scope = ExtensionScope.FRAMEWORK)
public interface HttpFilter {

void doFilter(HttpExchange exchange) throws HttpStatusException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12;

public interface HttpRequest extends RequestMetadata {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12;

public interface HttpResponse {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,104 +16,42 @@
*/
package org.apache.dubbo.remoting.http12.netty4.h1;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException;
import org.apache.dubbo.remoting.http12.h1.Http1Request;
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListenerFactory;
import org.apache.dubbo.remoting.http12.message.codec.CodecUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;

import java.util.concurrent.Executor;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class NettyHttp1ConnectionHandler extends SimpleChannelInboundHandler<Http1Request> {

private Http1ServerTransportListenerFactory http1ServerTransportListenerFactory;
private final URL url;

private final FrameworkModel frameworkModel;

private final URL url;
private final Http1ServerTransportListenerFactory http1ServerTransportListenerFactory;

private final Executor executor;

private final CodecUtils codecUtils;

private Http1ServerChannelObserver errorResponseObserver;

public NettyHttp1ConnectionHandler(URL url, FrameworkModel frameworkModel) {
public NettyHttp1ConnectionHandler(URL url, FrameworkModel frameworkModel, Http1ServerTransportListenerFactory http1ServerTransportListenerFactory) {
this.url = url;
this.frameworkModel = frameworkModel;
this.executor = url.getOrDefaultFrameworkModel()
.getExtensionLoader(ThreadPool.class)
.getAdaptiveExtension()
.getExecutor(url);
this.codecUtils = frameworkModel.getBeanFactory().getBean(CodecUtils.class);
}

public NettyHttp1ConnectionHandler(
URL url,
FrameworkModel frameworkModel,
Http1ServerTransportListenerFactory http1ServerTransportListenerFactory) {
this.url = url;
this.frameworkModel = frameworkModel;
this.executor = url.getOrDefaultFrameworkModel()
.getExtensionLoader(ThreadPool.class)
.getAdaptiveExtension()
.getExecutor(url);
this.codecUtils = frameworkModel.getBeanFactory().getBean(CodecUtils.class);
this.http1ServerTransportListenerFactory = http1ServerTransportListenerFactory;
}

public void setHttp1ServerTransportListenerFactory(
Http1ServerTransportListenerFactory http1ServerTransportListenerFactory) {
this.http1ServerTransportListenerFactory = http1ServerTransportListenerFactory;
executor = url.getOrDefaultFrameworkModel().getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}

/**
* process h1 request
*/
protected void channelRead0(ChannelHandlerContext ctx, Http1Request http1Request) {
// process h1 request
Http1ServerTransportListener http1TransportListener = initTransportListenerIfNecessary(ctx, http1Request);
initErrorResponseObserver(ctx, http1Request);
Http1ServerTransportListener http1TransportListener = http1ServerTransportListenerFactory.newInstance(new NettyHttp1Channel(ctx.channel()), url, frameworkModel);
executor.execute(() -> {
try {
http1TransportListener.onMetadata(http1Request);
http1TransportListener.onData(http1Request);
} catch (Exception e) {
errorResponseObserver.onError(e);
}
http1TransportListener.onMetadata(http1Request);
http1TransportListener.onData(http1Request);
});
}

private Http1ServerTransportListener initTransportListenerIfNecessary(
ChannelHandlerContext ctx, Http1Request http1Request) {
// each h1 request create http1TransportListener instance
Http1ServerTransportListenerFactory http1ServerTransportListenerFactory =
this.http1ServerTransportListenerFactory;
Assert.notNull(http1ServerTransportListenerFactory, "http1ServerTransportListenerFactory must be not null.");
Http1ServerTransportListener http1TransportListener = http1ServerTransportListenerFactory.newInstance(
new NettyHttp1Channel(ctx.channel()), url, frameworkModel);

HttpHeaders headers = http1Request.headers();
String contentType = headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
if (!StringUtils.hasText(contentType)) {
throw new UnsupportedMediaTypeException(contentType);
}
// check ContentType
codecUtils.determineHttpMessageDecoder(frameworkModel, headers.getContentType(), url);
return http1TransportListener;
}

private void initErrorResponseObserver(ChannelHandlerContext ctx, Http1Request request) {
this.errorResponseObserver = new Http1ServerChannelObserver(new NettyHttp1Channel(ctx.channel()));
this.errorResponseObserver.setResponseEncoder(
codecUtils.determineHttpMessageEncoder(frameworkModel, request.headers(), url));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.remoting.http12.netty4.h2;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
Expand All @@ -38,21 +37,13 @@

public class NettyHttp2ProtocolSelectorHandler extends SimpleChannelInboundHandler<HttpMetadata> {

private Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory;

private final URL url;

private final FrameworkModel frameworkModel;

public NettyHttp2ProtocolSelectorHandler(URL url, FrameworkModel frameworkModel) {
this.url = url;
this.frameworkModel = frameworkModel;
}
private final Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory;

public NettyHttp2ProtocolSelectorHandler(
URL url,
FrameworkModel frameworkModel,
Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory) {
public NettyHttp2ProtocolSelectorHandler(URL url, FrameworkModel frameworkModel, Http2ServerTransportListenerFactory defaultHttp2ServerTransportListenerFactory) {
this.url = url;
this.frameworkModel = frameworkModel;
this.defaultHttp2ServerTransportListenerFactory = defaultHttp2ServerTransportListenerFactory;
Expand All @@ -62,11 +53,7 @@ public NettyHttp2ProtocolSelectorHandler(
protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) {
HttpHeaders headers = metadata.headers();
String contentType = headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
// 415
if (!StringUtils.hasText(contentType)) {
throw new UnsupportedMediaTypeException(contentType);
}
Http2ServerTransportListenerFactory factory = adaptHttp2ServerTransportListenerFactory(contentType);
Http2ServerTransportListenerFactory factory = determineHttp2ServerTransportListenerFactory(contentType);
if (factory == null) {
throw new UnsupportedMediaTypeException(contentType);
}
Expand All @@ -84,7 +71,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) {
ctx.fireChannelRead(metadata);
}

private Http2ServerTransportListenerFactory adaptHttp2ServerTransportListenerFactory(String contentType) {
private Http2ServerTransportListenerFactory determineHttp2ServerTransportListenerFactory(String contentType) {
Set<Http2ServerTransportListenerFactory> http2ServerTransportListenerFactories = frameworkModel
.getExtensionLoader(Http2ServerTransportListenerFactory.class)
.getSupportedExtensionInstances();
Expand Down
Loading

0 comments on commit 1584e3b

Please sign in to comment.