Skip to content

Commit

Permalink
Merge branch 'main' into fix_qat
Browse files Browse the repository at this point in the history
Signed-off-by: He Jie Xu <hejie.xu@intel.com>
  • Loading branch information
soulxu committed Jun 14, 2024
2 parents a3770a3 + 73bf655 commit 355e2fb
Show file tree
Hide file tree
Showing 21 changed files with 853 additions and 960 deletions.
4 changes: 2 additions & 2 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ build:docs-ci --action_env=DOCS_RST_CHECK=1 --host_action_env=DOCS_RST_CHECK=1
build --incompatible_config_setting_private_default_visibility
build --incompatible_enforce_config_setting_visibility

test --bes_upload_mode=nowait_for_upload_complete
test --bes_timeout=30s
test --test_verbose_timeout_warnings
test --experimental_ui_max_stdouterr_bytes=3048576 #default 1048576

# Allow tags to influence execution requirements
common --experimental_allow_tags_propagation
Expand Down Expand Up @@ -511,6 +510,7 @@ build:rbe-google --config=cache-google

build:rbe-google-bes --bes_backend=grpcs://buildeventservice.googleapis.com
build:rbe-google-bes --bes_results_url=https://source.cloud.google.com/results/invocations/
build:rbe-google-bes --bes_upload_mode=fully_async

# RBE (Engflow mobile)
build:rbe-engflow --google_default_credentials=false
Expand Down
11 changes: 11 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ bug_fixes:
change: |
Added one option to disable the response body buffering for mirror request. Also introduced a 32MB cap for the response
buffer, which can be changed by the runtime flag ``http.async_response_buffer_limit`` based on the product needs.
- area: ext_authz
change: |
Validate http service path_prefix
:ref:`path_prefix <envoy_v3_api_field_extensions.filters.http.ext_authz.v3.HttpService.path_prefix>`,
Validate http service path_prefix configuration must start with ``/``.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down Expand Up @@ -204,9 +209,15 @@ removed_config_or_runtime:
- area: http
change: |
Removed ``envoy.reloadable_features.http_allow_partial_urls_in_referer`` runtime flag and legacy code paths.
- area: oauth
change: |
Removed ``envoy.reloadable_features.oauth_make_token_cookie_httponly`` runtime flag and legacy code paths.
- area: http
change: |
Removed ``envoy.reloadable_features.lowercase_scheme`` runtime flag and legacy code paths.
- area: oauth
change: |
Removed ``envoy.reloadable_features.hmac_base64_encoding_only`` runtime flag and legacy code paths.
- area: upstream
change: |
Removed ``envoy.reloadable_features.convert_legacy_lb_config`` runtime flag and legacy code paths.
Expand Down
37 changes: 28 additions & 9 deletions contrib/generic_proxy/filters/network/source/codecs/dubbo/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ class DubboRequest : public Request {
ASSERT(inner_metadata_ != nullptr);
ASSERT(inner_metadata_->hasContext());
ASSERT(inner_metadata_->hasRequest());

uint32_t frame_flags = FrameFlags::FLAG_END_STREAM; // Dubbo message only has one frame.
if (!inner_metadata_->context().isTwoWay()) {
frame_flags |= FrameFlags::FLAG_ONE_WAY;
}
if (inner_metadata_->context().heartbeat()) {
frame_flags |= FrameFlags::FLAG_HEARTBEAT;
}

stream_frame_flags_ = {static_cast<uint64_t>(inner_metadata_->requestId()), frame_flags};
}

// Request
Expand All @@ -43,9 +53,10 @@ class DubboRequest : public Request {
// StreamFrame
FrameFlags frameFlags() const override { return stream_frame_flags_; }

FrameFlags stream_frame_flags_;

Common::Dubbo::MessageMetadataSharedPtr inner_metadata_;

private:
FrameFlags stream_frame_flags_;
};

class DubboResponse : public Response {
Expand All @@ -56,6 +67,16 @@ class DubboResponse : public Response {
ASSERT(inner_metadata_->hasContext());
ASSERT(inner_metadata_->hasResponse());
refreshStatus();

uint32_t frame_flags = FrameFlags::FLAG_END_STREAM; // Dubbo message only has one frame.
if (!inner_metadata_->context().isTwoWay()) {
frame_flags |= FrameFlags::FLAG_ONE_WAY;
}
if (inner_metadata_->context().heartbeat()) {
frame_flags |= FrameFlags::FLAG_HEARTBEAT;
}

stream_frame_flags_ = {static_cast<uint64_t>(inner_metadata_->requestId()), frame_flags};
}

void refreshStatus();
Expand All @@ -67,10 +88,11 @@ class DubboResponse : public Response {
// StreamFrame
FrameFlags frameFlags() const override { return stream_frame_flags_; }

FrameFlags stream_frame_flags_;

StreamStatus status_;
Common::Dubbo::MessageMetadataSharedPtr inner_metadata_;

private:
FrameFlags stream_frame_flags_;
};

class DubboCodecBase : public Logger::Loggable<Logger::Id::connection> {
Expand Down Expand Up @@ -136,12 +158,9 @@ class DubboDecoderBase : public DubboCodecBase, public CodecType {
return Common::Dubbo::DecodeStatus::Success;
}

auto message = std::make_unique<DecoderMessageType>(metadata_);
message->stream_frame_flags_ = {{static_cast<uint64_t>(metadata_->requestId()),
!metadata_->context().isTwoWay(), false,
metadata_->context().heartbeat()},
true};
auto message = std::make_unique<DecoderMessageType>(std::move(metadata_));
metadata_.reset();

callback_->onDecodingSuccess(std::move(message));

return Common::Dubbo::DecodeStatus::Success;
Expand Down
14 changes: 11 additions & 3 deletions contrib/generic_proxy/filters/network/source/codecs/http1/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class HttpRequestFrame : public HttpHeaderFrame<StreamRequest> {
HttpRequestFrame(Http::RequestHeaderMapPtr request, bool end_stream)
: request_(std::move(request)) {
ASSERT(request_ != nullptr);
frame_flags_ = {StreamFlags{}, end_stream};
frame_flags_ = {0, end_stream ? FrameFlags::FLAG_END_STREAM : FrameFlags::FLAG_EMPTY};
}

absl::string_view host() const override { return request_->getHostValue(); }
Expand All @@ -80,7 +80,15 @@ class HttpResponseFrame : public HttpHeaderFrame<StreamResponse> {
const bool drain_close = Envoy::StringUtil::caseFindToken(
response_->getConnectionValue(), ",", Http::Headers::get().ConnectionValues.Close);

frame_flags_ = {StreamFlags{0, false, drain_close, false}, end_stream};
uint32_t flags = 0;
if (end_stream) {
flags |= FrameFlags::FLAG_END_STREAM;
}
if (drain_close) {
flags |= FrameFlags::FLAG_DRAIN_CLOSE;
}

frame_flags_ = {0, flags};
}

StreamStatus status() const override {
Expand All @@ -101,7 +109,7 @@ class HttpResponseFrame : public HttpHeaderFrame<StreamResponse> {
class HttpRawBodyFrame : public CommonFrame {
public:
HttpRawBodyFrame(Envoy::Buffer::Instance& buffer, bool end_stream)
: frame_flags_({StreamFlags{}, end_stream}) {
: frame_flags_(0, end_stream ? FrameFlags::FLAG_END_STREAM : FrameFlags::FLAG_EMPTY) {
buffer_.move(buffer);
}
FrameFlags frameFlags() const override { return frame_flags_; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class KafkaRequestFrame : public GenericProxy::StreamRequest {
if (request_ == nullptr) {
return FrameFlags{};
}
return FrameFlags{
StreamFlags{static_cast<uint64_t>(request_->request_header_.correlation_id_)}};
return FrameFlags{static_cast<uint64_t>(request_->request_header_.correlation_id_)};
}

absl::string_view protocol() const override { return "kafka"; }
Expand All @@ -46,7 +45,7 @@ class KafkaResponseFrame : public GenericProxy::StreamResponse {
if (response_ == nullptr) {
return FrameFlags{};
}
return FrameFlags{StreamFlags{static_cast<uint64_t>(response_->metadata_.correlation_id_)}};
return FrameFlags{static_cast<uint64_t>(response_->metadata_.correlation_id_)};
}

absl::string_view protocol() const override { return "kafka"; }
Expand Down
116 changes: 41 additions & 75 deletions contrib/generic_proxy/filters/network/source/interface/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,92 +17,35 @@ namespace Extensions {
namespace NetworkFilters {
namespace GenericProxy {

/**
* Stream flags from request or response to control the behavior of the
* generic proxy filter. This is mainly used as part of FrameFlags.
* All these flags could be ignored for the simple ping-pong use case.
*/
class StreamFlags {
public:
StreamFlags(uint64_t stream_id = 0, bool one_way_stream = false, bool drain_close = false,
bool is_heartbeat = false)
: stream_id_(stream_id), one_way_stream_(one_way_stream), drain_close_(drain_close),
is_heartbeat_(is_heartbeat) {}

/**
* @return the stream id of the request or response. This is used to match the
* downstream request with the upstream response.
* NOTE: In most cases, the stream id is not needed and will be ignored completely.
* The stream id is only used when we can't match the downstream request
* with the upstream response by the active stream instance self directly.
* For example, when the multiple downstream requests are multiplexed into one
* upstream connection.
*/
uint64_t streamId() const { return stream_id_; }

/**
* @return whether the stream is one way stream. If request is one way stream, the
* generic proxy filter will not wait for the response from the upstream.
*/
bool oneWayStream() const { return one_way_stream_; }

/**
* @return whether the downstream/upstream connection should be drained after
* current active stream are finished.
*/
bool drainClose() const { return drain_close_; }

/**
* @return whether the current request/response is a heartbeat request/response.
* NOTE: It would be better to handle heartbeat request/response by another L4
* filter. Then the generic proxy filter can be used for the simple ping-pong
* use case.
*/
bool isHeartbeat() const { return is_heartbeat_; }

private:
uint64_t stream_id_{0};

bool one_way_stream_{false};
bool drain_close_{false};
bool is_heartbeat_{false};
};

/**
* Flags of stream frame. This is used to control the behavior of the generic proxy filter.
* All these flags could be ignored for the simple ping-pong use case.
*/
class FrameFlags {
public:
static constexpr uint32_t FLAG_EMPTY = 0x0000;
static constexpr uint32_t FLAG_END_STREAM = 0x0001;
static constexpr uint32_t FLAG_ONE_WAY = 0x0002;
static constexpr uint32_t FLAG_DRAIN_CLOSE = 0x0004;
static constexpr uint32_t FLAG_HEARTBEAT = 0x0008;

/**
* Construct FrameFlags with stream flags and end stream flag. The stream flags MUST be
* same for all frames of the same stream.
* @param stream_flags StreamFlags of the stream.
* @param end_stream whether the current frame is the last frame of the request or response.
* @param stream_id the stream id of the request or response.
* @param flags flags of the current frame. Only the flags that defined in FrameFlags
* could be used. Multiple flags could be combined by bitwise OR.
* @param frame_tags frame tags of the current frame. The meaning of the frame tags is
* application protocol specific.
*/
FrameFlags(StreamFlags stream_flags = StreamFlags(), bool end_stream = true,
uint32_t frame_tags = 0)
: stream_flags_(stream_flags), frame_tags_(frame_tags), end_stream_(end_stream) {}
FrameFlags(uint64_t stream_id = 0, uint32_t flags = FLAG_END_STREAM, uint32_t frame_tags = 0)
: stream_id_(stream_id), flags_(flags), frame_tags_(frame_tags) {}

/**
* Get flags of stream that the frame belongs to. The flags MUST be same for all frames of the
* same stream. Copy semantics is used because the flags are lightweight (only 16 bytes for now).
* @return StreamFlags of the stream.
* @return the stream id of the request or response. All frames of the same stream
* MUST have the same stream id.
*/
StreamFlags streamFlags() const { return stream_flags_; }

/**
* @return the stream id of the request or response.
*/
uint64_t streamId() const { return stream_flags_.streamId(); }

/**
* @return whether the current frame is the last frame of the request or response.
*/
bool endStream() const { return end_stream_; }
uint64_t streamId() const { return stream_id_; }

/**
* @return frame tags of the current frame. The meaning of the frame tags is application
Expand All @@ -114,12 +57,35 @@ class FrameFlags {
*/
uint32_t frameTags() const { return frame_tags_; }

private:
StreamFlags stream_flags_{};
/**
* @return whether the current frame is the last frame of the request or response.
*/
bool endStream() const { return flags_ & FLAG_END_STREAM; }

/**
* @return whether the downstream/upstream connection should be drained after
* current active stream are finished.
* NOTE: Only the response header frame's drainClose() flag will be used.
*/
bool drainClose() const { return flags_ & FLAG_DRAIN_CLOSE; }

/**
* @return whether the stream is one way stream. If request is one way stream, the
* generic proxy filter will not wait for the response from the upstream.
* NOTE: Only the request header frame's oneWayStream() flag will be used.
*/
bool oneWayStream() const { return flags_ & FLAG_ONE_WAY; }

/**
* @return whether the current request/response is a heartbeat request/response.
* NOTE: Only the header frame's isHeartbeat() flag will be used.
*/
bool isHeartbeat() const { return flags_ & FLAG_HEARTBEAT; }

private:
uint64_t stream_id_{};
uint32_t flags_{};
uint32_t frame_tags_{};
// Default to true for backward compatibility.
bool end_stream_{true};
};

/**
Expand Down
Loading

0 comments on commit 355e2fb

Please sign in to comment.