diff --git a/debian/changelog b/debian/changelog index 665149e50..e03bebdb0 100644 --- a/debian/changelog +++ b/debian/changelog @@ -10,6 +10,7 @@ cm4all-beng-proxy (19.1) unstable; urgency=low * {http,nghttp2}/server: use optimized per-request memory allocator * access_log: send multiple log datagrams with sendmmsg() * translation/cache: reduce CPU and memory overhead + * brotli: offload to worker thread -- diff --git a/src/bp/Response.cxx b/src/bp/Response.cxx index 06fa32f83..3b8f019b5 100644 --- a/src/bp/Response.cxx +++ b/src/bp/Response.cxx @@ -42,6 +42,7 @@ #include "translation/Transformation.hxx" #include "translation/Service.hxx" #include "http/Address.hxx" +#include "thread/Pool.hxx" #include "co/Task.hxx" #include "uri/Relocate.hxx" #include "uri/Verify.hxx" @@ -205,7 +206,9 @@ Request::ApplyAutoCompress(HttpHeaders &response_headers, resource_tag, response_headers, response_body, "br", [this, &response_headers](auto &&i){ - return NewBrotliEncoderIstream(pool, std::move(i), + return NewBrotliEncoderIstream(pool, + thread_pool_get_queue(instance.event_loop), + std::move(i), {.text_mode = IsTextMimeType(response_headers)}); })) return; diff --git a/src/istream/BrotliEncoderIstream.cxx b/src/istream/BrotliEncoderIstream.cxx index 697b5db02..512a70516 100644 --- a/src/istream/BrotliEncoderIstream.cxx +++ b/src/istream/BrotliEncoderIstream.cxx @@ -3,27 +3,19 @@ // author: Max Kellermann #include "BrotliEncoderIstream.hxx" +#include "ThreadIstream.hxx" #include "UnusedPtr.hxx" -#include "New.hxx" -#include "FacadeIstream.hxx" -#include "util/DestructObserver.hxx" #include #include #include -static std::span -BrotliEncoderTakeOutput(BrotliEncoderState *state) noexcept -{ - std::size_t size; - const uint8_t *data = BrotliEncoderTakeOutput(state, &size); - return {reinterpret_cast(data), size}; -} - -class BrotliEncoderIstream final : public FacadeIstream, DestructAnchor { +class BrotliEncoderFilter final : public ThreadIstreamFilter { BrotliEncoderState *state = nullptr; + SliceFifoBuffer input, output; + /** * Pending output data from the encoder. Since this buffer * will be invalidated by the next encoder call, we need to @@ -34,74 +26,29 @@ class BrotliEncoderIstream final : public FacadeIstream, DestructAnchor { const BrotliEncoderMode mode; - /** - * Do we expect to get data from the encoder? That is, did we - * feed data into it without getting anything back yet? This - * is used to decide whether to flush. - */ - bool expected = false; - - bool had_input, had_output; + BrotliEncoderOperation operation = BROTLI_OPERATION_PROCESS; public: - BrotliEncoderIstream(struct pool &_pool, UnusedIstreamPtr _input, - BrotliEncoderIstreamParams params) noexcept - :FacadeIstream(_pool, std::move(_input)), - mode(params.text_mode ? BROTLI_MODE_TEXT : BROTLI_MODE_GENERIC) + explicit BrotliEncoderFilter(BrotliEncoderParams params) noexcept + :mode(params.text_mode ? BROTLI_MODE_TEXT : BROTLI_MODE_GENERIC) { } - ~BrotliEncoderIstream() noexcept override { + ~BrotliEncoderFilter() noexcept override { if (state != nullptr) BrotliEncoderDestroyInstance(state); } -private: - enum class WriteResult { - EMPTY, - BLOCKING, - CONSUMED_SOME, - CONSUMED_ALL, - CLOSED, - }; - - /** - * Submit data from #pending to our #IstreamHandler. - */ - WriteResult SubmitPending(const DestructObserver &destructed) noexcept; - - /** - * Submit data from the buffer to our #IstreamHandler. - */ - WriteResult SubmitEncoded() noexcept; - - /** - * Read from our input until we have submitted some bytes to our - * istream handler. - */ - void ForceRead() noexcept; - protected: void CreateEncoder() noexcept; - /* virtual methods from class Istream */ - - off_t _GetAvailable(bool partial) noexcept override { - return partial - ? pending.size() - : -1; - } - - void _Read() noexcept override; - - /* virtual methods from class IstreamHandler */ - std::size_t OnData(std::span src) noexcept override; - void OnEof() noexcept override; - void OnError(std::exception_ptr ep) noexcept override; + /* virtual methods from class ThreadIstreamFilter */ + void Run(ThreadIstreamInternal &i) override; + void PostRun(ThreadIstreamInternal &i) noexcept override; }; -void -BrotliEncoderIstream::CreateEncoder() noexcept +inline void +BrotliEncoderFilter::CreateEncoder() noexcept { assert(state == nullptr); @@ -115,242 +62,67 @@ BrotliEncoderIstream::CreateEncoder() noexcept BrotliEncoderSetParameter(state, BROTLI_PARAM_MODE, mode); } -inline BrotliEncoderIstream::WriteResult -BrotliEncoderIstream::SubmitPending(const DestructObserver &destructed) noexcept -{ - if (pending.empty()) - return WriteResult::EMPTY; - - had_output = true; - - size_t consumed = InvokeData(pending); - if (destructed) { - assert(consumed == 0); - return WriteResult::CLOSED; - } - - if (consumed == 0) - return WriteResult::BLOCKING; - - pending = pending.subspan(consumed); - return pending.empty() ? WriteResult::CONSUMED_ALL : WriteResult::CONSUMED_SOME; -} - -BrotliEncoderIstream::WriteResult -BrotliEncoderIstream::SubmitEncoded() noexcept -{ - assert(state != nullptr); - - const DestructObserver destructed{*this}; - - bool consumed_some = false; - - while (true) { - switch (const auto r = SubmitPending(destructed)) { - case WriteResult::EMPTY: - break; - - case WriteResult::CONSUMED_ALL: - consumed_some = true; - break; - - case WriteResult::BLOCKING: - return consumed_some - ? WriteResult::CONSUMED_SOME - : WriteResult::BLOCKING; - - case WriteResult::CONSUMED_SOME: - case WriteResult::CLOSED: - return r; - } - - pending = BrotliEncoderTakeOutput(state); - if (pending.empty()) { - if (HasInput()) - return consumed_some - ? WriteResult::CONSUMED_ALL - : WriteResult::EMPTY; - - size_t available_in = 0, available_out = 0; - const uint8_t *next_in = nullptr; - uint8_t *next_out = nullptr; - - if (!BrotliEncoderCompressStream(state, BROTLI_OPERATION_FINISH, - &available_in, &next_in, - &available_out, &next_out, - nullptr)) { - DestroyError(std::make_exception_ptr(std::runtime_error{"Brotli finish error"})); - return WriteResult::CLOSED; - } - - pending = BrotliEncoderTakeOutput(state); - if (pending.empty()) { - DestroyEof(); - return WriteResult::CLOSED; - } - } - - expected = false; - } -} - -inline void -BrotliEncoderIstream::ForceRead() noexcept -{ - assert(HasInput()); - - const DestructObserver destructed{*this}; - - had_output = false; - - do { - had_input = false; - input.Read(); - if (destructed) - return; - } while (HasInput() && had_input && !had_output); - - if (HasInput() && !had_output && expected) { - /* we didn't get any encoded data, even though the - encoder got raw data - to obey the Istream API, - flush the encoder */ - // TODO can we optimize this away? - - size_t available_in = 0, available_out = 0; - const uint8_t *next_in = nullptr; - uint8_t *next_out = nullptr; - - if (!BrotliEncoderCompressStream(state, BROTLI_OPERATION_FLUSH, - &available_in, &next_in, - &available_out, &next_out, - nullptr)) { - DestroyError(std::make_exception_ptr(std::runtime_error{"Brotli flush error"})); - return; - } - - SubmitEncoded(); - } -} - void -BrotliEncoderIstream::_Read() noexcept +BrotliEncoderFilter::Run(ThreadIstreamInternal &i) { - if (state != nullptr) { - switch (SubmitEncoded()) { - case WriteResult::EMPTY: - case WriteResult::CONSUMED_ALL: - /* the libbrotli output buffer is empty and is - ready for the next - BrotliEncoderCompressStream() call, so - let's obtain data from our input */ - assert(!BrotliEncoderHasMoreOutput(state)); - assert(pending.empty()); - assert(HasInput()); - break; - - case WriteResult::BLOCKING: - case WriteResult::CONSUMED_SOME: - /* our handler did not consume everything */ - assert(!pending.empty()); - return; - - case WriteResult::CLOSED: - /* bail out without touching anything */ - return; - } - } - - ForceRead(); -} - -std::size_t -BrotliEncoderIstream::OnData(const std::span src) noexcept -{ - assert(HasInput()); - - had_input = true; + using std::swap; if (state == nullptr) CreateEncoder(); - size_t available_in = src.size(); - const uint8_t *next_in = reinterpret_cast(src.data()); - - /* for the first iteration, pretend the encoder consumed some - input data */ - bool has_consumed_input = true; - - while (true) { - bool has_consumed_output; - - /* first submit pending data because the - BrotliEncoderCompressStream() call below would - invalidat the #pending buffer */ - switch (SubmitEncoded()) { - case WriteResult::EMPTY: - has_consumed_output = false; - break; + { + const std::scoped_lock lock{i.mutex}; + input.MoveFromAllowBothNull(i.input); - case WriteResult::CONSUMED_ALL: - has_consumed_output = true; - break; + if (!i.has_input && i.input.empty()) + operation = BROTLI_OPERATION_FINISH; - case WriteResult::BLOCKING: - case WriteResult::CONSUMED_SOME: - return src.size() - available_in; + if (!output.IsNull()) + i.output.MoveFromAllowNull(output); + else if (i.output.empty()) + swap(output, i.output); + } - case WriteResult::CLOSED: - return 0; - } + const auto r = input.Read(); + const auto w = output.Write(); - if (available_in == 0 || - (!has_consumed_input && !has_consumed_output)) - return src.size() - available_in; + std::size_t available_in = r.size(); + const uint8_t *next_in = reinterpret_cast(r.data()); - const size_t old_available_in = available_in; + std::size_t available_out = w.size(); + uint8_t *next_out = reinterpret_cast(w.data()); - size_t available_out = 0; - uint8_t *next_out = nullptr; + if (!BrotliEncoderCompressStream(state, operation, + &available_in, &next_in, + &available_out, &next_out, + nullptr)) + throw std::runtime_error{"Brotli error"}; - if (!BrotliEncoderCompressStream(state, BROTLI_OPERATION_PROCESS, - &available_in, &next_in, - &available_out, &next_out, - nullptr)) { - DestroyError(std::make_exception_ptr(std::runtime_error{"Brotli error"})); - return 0; - } + input.Consume(reinterpret_cast(next_in) - r.data()); + output.Append(reinterpret_cast(next_out) - w.data()); - has_consumed_input = available_in != old_available_in; + if (available_out == 0) + i.again = true; - expected = true; + { + const std::scoped_lock lock{i.mutex}; + i.output.MoveFromAllowSrcNull(output); + i.drained = output.empty(); } - - return src.size() - available_in; } void -BrotliEncoderIstream::OnEof() noexcept +BrotliEncoderFilter::PostRun(ThreadIstreamInternal &) noexcept { - ClearInput(); - - if (state == nullptr) - CreateEncoder(); - - SubmitEncoded(); -} - -void -BrotliEncoderIstream::OnError(std::exception_ptr ep) noexcept -{ - ClearInput(); - - DestroyError(std::move(ep)); + input.FreeIfEmpty(); + output.FreeIfEmpty(); } UnusedIstreamPtr -NewBrotliEncoderIstream(struct pool &pool, UnusedIstreamPtr input, - BrotliEncoderIstreamParams params) noexcept +NewBrotliEncoderIstream(struct pool &pool, ThreadQueue &queue, + UnusedIstreamPtr input, + BrotliEncoderParams params) noexcept { - return NewIstreamPtr(pool, std::move(input), params); - + return NewThreadIstream(pool, queue, std::move(input), + std::make_unique(params)); } diff --git a/src/istream/BrotliEncoderIstream.hxx b/src/istream/BrotliEncoderIstream.hxx index 2e8e91ac3..a34dfd536 100644 --- a/src/istream/BrotliEncoderIstream.hxx +++ b/src/istream/BrotliEncoderIstream.hxx @@ -6,8 +6,9 @@ struct pool; class UnusedIstreamPtr; +class ThreadQueue; -struct BrotliEncoderIstreamParams { +struct BrotliEncoderParams { /** * Set BROTLI_MODE_TEXT. */ @@ -18,5 +19,6 @@ struct BrotliEncoderIstreamParams { * An #Istream filter which compresses data on-the-fly with Brotli. */ UnusedIstreamPtr -NewBrotliEncoderIstream(struct pool &pool, UnusedIstreamPtr input, - BrotliEncoderIstreamParams params={}) noexcept; +NewBrotliEncoderIstream(struct pool &pool, ThreadQueue &queue, + UnusedIstreamPtr input, + BrotliEncoderParams params={}) noexcept; diff --git a/test/istream/TestBrotliEncoderIstream.cxx b/test/istream/TestBrotliEncoderIstream.cxx index 1d92c91b6..b87f1c2fb 100644 --- a/test/istream/TestBrotliEncoderIstream.cxx +++ b/test/istream/TestBrotliEncoderIstream.cxx @@ -6,6 +6,7 @@ #include "istream/BrotliEncoderIstream.hxx" #include "istream/istream_string.hxx" #include "istream/UnusedPtr.hxx" +#include "thread/Pool.hxx" #include @@ -36,20 +37,37 @@ BrotliDecompressString(std::string_view src) } class BrotliEncoderIstreamTestTraits { + mutable EventLoop *event_loop_ = nullptr; + public: static constexpr IstreamFilterTestOptions options{ .expected_result = "foobar", .transform_result = BrotliDecompressString, .enable_buckets = false, + .late_finish = true, }; + ~BrotliEncoderIstreamTestTraits() noexcept { + // invoke all pending ThreadJob::Done() calls + if (event_loop_ != nullptr) + event_loop_->Run(); + + thread_pool_stop(); + thread_pool_join(); + thread_pool_deinit(); + } + UnusedIstreamPtr CreateInput(struct pool &pool) const noexcept { return istream_string_new(pool, "foobar"); } - UnusedIstreamPtr CreateTest(EventLoop &, struct pool &pool, + UnusedIstreamPtr CreateTest(EventLoop &event_loop, struct pool &pool, UnusedIstreamPtr input) const noexcept { - return NewBrotliEncoderIstream(pool, std::move(input)); + event_loop_ = &event_loop; + + thread_pool_set_volatile(); + return NewBrotliEncoderIstream(pool, thread_pool_get_queue(event_loop), + std::move(input)); } };