From d6d92ef11b71ada3fdfc03d7d1f2a114e32127b5 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 5 Apr 2024 11:42:11 -0400 Subject: [PATCH] [libbeat] Delete proxy queue (#38570) Delete the proxy queue, a prototype written to reduce memory use in the old shipper project. Recent improvements to the memory queue (https://github.com/elastic/beats/pull/37795, https://github.com/elastic/beats/pull/38166) added support for the same early-free mechanisms as the proxy queue, so it is now redundant. The proxy queue was never used or exposed in a public release, so there are no compatibility concerns. (This is pre-cleanup for adding early-encoding support, to avoid implementing new functionality in a queue that is no longer used.) --- libbeat/outputs/shipper/shipper.go | 10 +- libbeat/publisher/queue/proxy/README.md | 68 -- libbeat/publisher/queue/proxy/batch.go | 110 --- libbeat/publisher/queue/proxy/broker.go | 286 ------ .../publisher/queue/proxy/diagrams/broker.d2 | 54 -- .../publisher/queue/proxy/diagrams/broker.svg | 847 ------------------ libbeat/publisher/queue/proxy/internal_api.go | 41 - libbeat/publisher/queue/proxy/produce.go | 81 -- libbeat/publisher/queue/proxy/queue_test.go | 232 ----- 9 files changed, 7 insertions(+), 1722 deletions(-) delete mode 100644 libbeat/publisher/queue/proxy/README.md delete mode 100644 libbeat/publisher/queue/proxy/batch.go delete mode 100644 libbeat/publisher/queue/proxy/broker.go delete mode 100644 libbeat/publisher/queue/proxy/diagrams/broker.d2 delete mode 100644 libbeat/publisher/queue/proxy/diagrams/broker.svg delete mode 100644 libbeat/publisher/queue/proxy/internal_api.go delete mode 100644 libbeat/publisher/queue/proxy/produce.go delete mode 100644 libbeat/publisher/queue/proxy/queue_test.go diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index fe19a36b31d2..83955a80f4ce 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -25,7 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" - proxyqueue "github.com/elastic/beats/v7/libbeat/publisher/queue/proxy" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" sc "github.com/elastic/elastic-agent-shipper-client/pkg/proto" @@ -111,8 +111,12 @@ func makeShipper( return outputs.Group{ Clients: []outputs.Client{swb}, Retry: config.MaxRetries, - QueueFactory: proxyqueue.FactoryForSettings( - proxyqueue.Settings{BatchSize: config.BulkMaxSize}), + QueueFactory: memqueue.FactoryForSettings( + memqueue.Settings{ + Events: config.BulkMaxSize * 2, + MaxGetRequest: config.BulkMaxSize, + FlushTimeout: 0, + }), }, nil } diff --git a/libbeat/publisher/queue/proxy/README.md b/libbeat/publisher/queue/proxy/README.md deleted file mode 100644 index 0ff611fb2389..000000000000 --- a/libbeat/publisher/queue/proxy/README.md +++ /dev/null @@ -1,68 +0,0 @@ -# Beats Proxy Queue - -The proxy queue is an implementation of the [beats Queue interface](https://github.com/elastic/beats/blob/main/libbeat/publisher/queue/queue.go) meant to work with the Shipper output. The Shipper output is unique because rather than sending events to a remote server it sends them to the Shipper, a local process that has its own queue where it stages events for delivery to their true destination upstream. This means that if the Shipper output is used with a conventional queue, events will remain queued in both Beats _and_ the shipper until they receive upstream acknowledgment, potentially doubling the memory needed for a given set of events. - -The solution to this is the proxy queue: from the perspective of the Beats pipeline, it behaves like a normal (albeit small) queue, but its buffer is immediately cleared on being read, and it provides a hook in its event batches for the output to free its contents once sent, while still preserving metadata so that inputs that require end-to-end acknowledgment of their events can receive the acknowledgments later, after the Shipper confirms upstream ingestion. - -## Limitations - -Some features present in other queues are unimplemented or ignored by the proxy queue since they are unneeded when ingesting via the Shipper output: - -- `queue.EntryID`: a `Publish` call to a normal queue returns an `EntryID`, a unique integer that is incremented with each event. This data is only used internally in the Shipper to track acknowledgments, and is unused by Beats. -- Producer cancel: When a `queue.Producer` (the API interface for adding data to a queue) is cancelled, the memory queue attempts to remove from its buffer any events sent by that producer that have not yet been consumed. This feature is only ever used during Beats shutdown, and since the proxy queue in particular never accumulates events itself but instead stores them in the Shipper's queue, it has no mechanism to cancel most outstanding events. -- Requested batch size: The queue interface reads event batches by specifying the desired number of events, which the queue will try to satisfy. Because batches from the proxy queue are being sent to a local process rather than over a network, there is less performance sensitivity to the batch size. Because the proxy queue optimizes its buffer by using it to directly store the batch contents, we can get simpler and more consistent performance by accumulating up to a maximum size and then sending that immediately when a batch is requested. Therefore the proxy queue has its own configurable target batch size, and ignores the parameter given by the consumer. -- Metrics: The proxy queue implements the usual queue metrics for the Beats pipeline, however it doesn't implement the `Metrics()` call, as that is only used by the Shipper (and its contents would be mostly meaningless in the proxy case since events are not allowed to accumulate). - -## Implementation - -The proxy queue is loosely based on the implementation of the memory queue, but with many simplifications enabled by its more limited scope. It has three control channels, `getChan`, `pushChan`, and `doneChan`, all unbuffered. Its internal state can only be changed by sending requests to those channels (or closing the channel in the case of `doneChan`), or by closing the done channel on batches it has returned. - -### The pipeline - -Here is the event data flow through the proxy queue, in the context of the Beats pipeline: - -![The proxy queue in context](diagrams/broker.svg) - -An input adds an event to the proxy queue by creating a `queue.Producer` via the queue's API and calling its `Publish` function. If the producer was created with an acknowledgment callback, then a pointer to the producer will be included in its event metadata so later stages of the pipeline can notify it when ingestion is complete. - -The producer passes an incoming event on to the queue by sending a `pushRequest` to the queue's `pushChan`. The request includes the event, the producer (if acknowledgment is required), a channel on which to receive the response (boolean indicating success or failure), and a flag indicating whether a full queue should block the request until there is room or return immediately with failure. `pushChan` is unbuffered, and any request sent through it is guaranteed to receive a response. If the request's `canBlock` flag is false, that response is guaranteed not to block. If `canBlock` is true, the response is guaranteed to be success unless the queue has been closed. - -On the other side of the queue, a worker routine (`queueReader`) requests batches from the queue via its `Get` function, which sends a `getRequest` to the queue's `getChan`. A `getRequest` always blocks until there is data to read or until the queue is closed; as with `pushRequest`, once it is accepted it always returns a response. If the request is successful, the response will be a `proxyqueue.batch` (implementing the `queue.Batch` interface). The `queueReader`'s job is to collect batches from the queue and wrap them in a `publisher.Batch` interface (concrete type `ttlBatch`) that tracks retry metadata used in the final stages of the pipeline. - -The wrapped batches generated by the `queueReader` are received by the `eventConsumer`, which is the worker that distributes pipeline batches among the output workers via their shared input channel, and handles retries for output workers that encounter errors. - -Only an output worker can complete the life cycle of a batch. In the proxy queue this happens in two stages: when the batch is successfully sent to the Shipper, its `FreeEntries` function is called, which clears the internal reference to the events -- once these are sent, they are no longer needed since they are already enqueued in the Shipper. Then, when the Shipper confirms (via its `PersistedIndex` API, see the Shipper repository for details) that all events from the batch have been processed, the batch's `Done` function is called, which closes the batch's internal channel, `doneChan`. - -Finally, the queue's broker routine monitors the `doneChan` of the oldest outstanding batch; when it is closed, the broker invokes the appropriate acknowledgment callbacks and advances to the next oldest batch. - -### Acknowledgment tracking - -As with other queues, acknowledgments of batches must be globally synchronized by the queue broker, since the pipeline API requires that acknowledgments are sent to producers in the same order the events were generated (out-of-order acknowledgments can cause data loss). The acknowledgments required by any one batch are stored within the batch itself (in the `producerACKs` helper object). The queue broker maintains an ordered linked list of all batches awaiting acknowledgment, and the `select` call in its main loop checks the oldest outstanding batch, calling the appropriate callbacks as it advances. - -### The broker loop - -All internal control logic is handled in the run loop `broker.run()` in `broker.go`. Its state is stored in these fields: - -```go - queuedEntries []queueEntry - blockedRequests blockedRequests - outstandingBatches batchList -``` - -- `queuedEntries` is a list of the events (and producers, if appropriate) currently stored by the queue. Its length is at most `batchSize`. -- `blockedRequests` is a linked list of pending `pushRequest`s from producers that could not be immediately handled because the queue was full. Each one contains a response channel, and the originating producer is listening on that channel waiting for space in the queue. When space is available, events in these requests will be added to `queuedEntries` and the result will be sent to their response channels. -- `outstandingBatches` is a linked list of batches that have been consumed from this queue but not yet acknowledged. It is in the same order as the batches were originally created, so the first entry in the list is always the oldest batch awaiting acknowledgment. - -The core loop calls `select` across up to four channels: - -- `putChan` accepts requests to add entries to the queue. If the queue is already full (`len(queuedEntries) == batchSize`), the request is either added to `blockedRequests` or returns with immediate failure (depending on the value of `canBlock`). Otherwise, the new entry is added to `queuedEntries` to be included in the next batch. -- `getChan` is enabled only if `queuedEntries` isn't empty (otherwise there would be nothing to return). In that case, a new batch is created with the contents of `queuedEntries`, and metadata for any required future acknowledgments is computed (so that acknowledgment data can persist after the events themselves are freed). -- `outstandingBatches.nextDoneChan()` returns the acknowledgment channel for the oldest outstanding batch; if a read on this channel goes through, it means the channel was closed and the batch has been acknowledged, so the producer and pipeline callbacks are invoked and we advance to the next outstanding batch. -- `doneChan` indicates closure of the queue. In this case we reject any remaining requests in `blockedRequests` and return. (We do not do anything with `outstandingBatches`, since batches that are still unacknowledged at this point should be considered dropped, so we do not want producers to believe they have sent successfully.) - -## Possible improvements - -The proxy queue is designed to minimize memory use while respecting the established API for the Beats pipeline. However, its inability to buffer incoming events means that raw latency may increase in some scenarios. If benchmarks show that the proxy queue is a CPU or latency bottleneck, there are some natural improvements that would likely yield significant improvements: - -- The proxy queue currently buffers at most one batch at a time. Buffering a small constant number of batches instead would potentially block the inputs less often, leading to steadier throughput. -- Unlike the other queues, the proxy queue handles acknowledgments on its main work loop. This may increase latency of control signals if it is given acknowledgment callbacks that perform significant work. In that case, we could add a standalone acknowledgment routine similar to the other queues, so slow acknowledgments do not delay the core control logic. \ No newline at end of file diff --git a/libbeat/publisher/queue/proxy/batch.go b/libbeat/publisher/queue/proxy/batch.go deleted file mode 100644 index 1747af527ecd..000000000000 --- a/libbeat/publisher/queue/proxy/batch.go +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package proxyqueue - -type batch struct { - entries []queueEntry - - // Original number of entries (persists even if entries are freed). - originalEntryCount int - - producerACKs []producerACKData - - // When a batch is acknowledged, doneChan is closed to tell - // the queue to call the appropriate producer and metrics callbacks. - doneChan chan struct{} - - // Batches are collected in linked lists to preserve the order of - // acknowledgments. This field should only be used by batchList. - next *batch -} - -type batchList struct { - first *batch - last *batch -} - -// producerACKData tracks the number of events that need to be acknowledged -// from a single batch targeting a single producer. -type producerACKData struct { - producer *producer - count int -} - -func (b *batch) Count() int { - return b.originalEntryCount -} - -func (b *batch) Entry(i int) interface{} { - return b.entries[i].event -} - -func (b *batch) FreeEntries() { - b.entries = nil -} - -func (b *batch) Done() { - close(b.doneChan) -} - -func acksForEntries(entries []queueEntry) []producerACKData { - results := []producerACKData{} - // We traverse the list back to front, so we can coalesce multiple events - // into a single entry in the ACK data. - for i := len(entries) - 1; i >= 0; i-- { - entry := entries[i] - if producer := entry.producer; producer != nil { - if producer.producedCount > producer.consumedCount { - results = append(results, producerACKData{ - producer: producer, - count: int(producer.producedCount - producer.consumedCount), - }) - producer.consumedCount = producer.producedCount - } - } - } - return results -} - -func (l *batchList) add(b *batch) { - b.next = nil // Should be unneeded but let's be cautious - if l.last != nil { - l.last.next = b - } else { - l.first = b - } - l.last = b -} - -func (l *batchList) remove() *batch { - result := l.first - if l.first != nil { - l.first = l.first.next - if l.first == nil { - l.last = nil - } - } - return result -} - -func (l *batchList) nextDoneChan() chan struct{} { - if l.first != nil { - return l.first.doneChan - } - return nil -} diff --git a/libbeat/publisher/queue/proxy/broker.go b/libbeat/publisher/queue/proxy/broker.go deleted file mode 100644 index 832739cc26d9..000000000000 --- a/libbeat/publisher/queue/proxy/broker.go +++ /dev/null @@ -1,286 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package proxyqueue - -import ( - "io" - "sync" - - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/elastic-agent-libs/logp" -) - -type broker struct { - doneChan chan struct{} - - logger *logp.Logger - - // The maximum number of events in any pending batch - batchSize int - - /////////////////////////// - // api channels - - // Producers send queue entries to pushChan to add them to the next batch. - pushChan chan *pushRequest - - // Consumers send requests to getChan to read entries from the queue. - getChan chan getRequest - - // A callback that should be invoked when ACKs are processed. - // This is used to forward notifications back to the pipeline observer, - // which updates the beats registry if needed. This callback is included - // in batches created by the proxy queue, so they can invoke it when they - // receive a Done call. - ackCallback func(eventCount int) - - // Internal state for the broker's run loop. - queuedEntries []queueEntry - blockedRequests blockedRequests - outstandingBatches batchList - - // wait group for worker shutdown - wg sync.WaitGroup -} - -type Settings struct { - BatchSize int -} - -type queueEntry struct { - event interface{} - - // The producer that generated this event, or nil if this producer does - // not require ack callbacks. - producer *producer -} - -type blockedRequest struct { - next *blockedRequest - request *pushRequest -} - -// linked list helper to store an ordered list of blocked requests -type blockedRequests struct { - first *blockedRequest - last *blockedRequest -} - -const QueueType = "proxy" - -// FactoryForSettings is a simple wrapper around NewQueue so a concrete -// Settings object can be wrapped in a queue-agnostic interface for -// later use by the pipeline. -func FactoryForSettings(settings Settings) queue.QueueFactory { - return func( - logger *logp.Logger, - ackCallback func(eventCount int), - inputQueueSize int, - ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings), nil - } -} - -// NewQueue creates a new broker based in-memory queue holding up to sz number of events. -// If waitOnClose is set to true, the broker will block on Close, until all internal -// workers handling incoming messages and ACKs have been shut down. -func NewQueue( - logger *logp.Logger, - ackCallback func(eventCount int), - settings Settings, -) *broker { - if logger == nil { - logger = logp.NewLogger("proxyqueue") - } - - b := &broker{ - doneChan: make(chan struct{}), - logger: logger, - batchSize: settings.BatchSize, - - // broker API channels - pushChan: make(chan *pushRequest), - getChan: make(chan getRequest), - - ackCallback: ackCallback, - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - b.run() - }() - - return b -} - -func (b *broker) Close() error { - close(b.doneChan) - b.wg.Wait() - return nil -} - -func (b *broker) QueueType() string { - return QueueType -} - -func (b *broker) BufferConfig() queue.BufferConfig { - return queue.BufferConfig{} -} - -func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { - return newProducer(b, cfg.ACK) -} - -func (b *broker) Get(_ int) (queue.Batch, error) { - // The response channel needs a buffer size of 1 to guarantee that the - // broker routine will not block when sending the response. - responseChan := make(chan *batch, 1) - select { - case <-b.doneChan: - return nil, io.EOF - case b.getChan <- getRequest{responseChan: responseChan}: - } - - // if request has been sent, we are guaranteed a response - return <-responseChan, nil -} - -// Metrics returns an empty response because the proxy queue -// doesn't accumulate batches; for the real metadata, use either the -// Beats pipeline metrics, or the queue metrics in the shipper, which -// is where pending events are really queued when the proxy queue is -// in use. -func (b *broker) Metrics() (queue.Metrics, error) { - return queue.Metrics{}, nil -} - -func (b *broker) run() { - for { - var getChan chan getRequest - // Get requests are enabled if the current pending batch is nonempty. - if len(b.queuedEntries) > 0 { - getChan = b.getChan - } - - select { - case <-b.doneChan: - // The queue is closing, reject any requests that were blocked - // waiting for space in the queue. - blocked := b.blockedRequests - for req := blocked.next(); req != nil; req = blocked.next() { - req.responseChan <- false - } - return - - case req := <-b.pushChan: // producer pushing new event - b.handlePushRequest(req) - - case req := <-getChan: // consumer asking for next batch - b.handleGetRequest(req) - - case <-b.outstandingBatches.nextDoneChan(): - ackedBatch := b.outstandingBatches.remove() - // Notify any listening producers - for _, ack := range ackedBatch.producerACKs { - ack.producer.ackHandler(ack.count) - } - // Notify the pipeline's metrics reporter - //nolint:typecheck // this nil check is ok - if b.ackCallback != nil { - b.ackCallback(ackedBatch.originalEntryCount) - } - } - } -} - -func (b *broker) handlePushRequest(req *pushRequest) { - if len(b.queuedEntries) < b.batchSize { - b.queuedEntries = append(b.queuedEntries, - queueEntry{event: req.event, producer: req.producer}) - if req.producer != nil { - req.producer.producedCount++ - } - req.responseChan <- true - } else if req.canBlock { - // If there isn't room for the event, but the producer wants - // to block until there is, add it to the queue. - b.blockedRequests.add(req) - } else { - // The pending batch is full, the producer doesn't want to - // block, so return immediate failure. - req.responseChan <- false - } -} - -func (b *broker) handleGetRequest(req getRequest) { - acks := acksForEntries(b.queuedEntries) - - newBatch := &batch{ - entries: b.queuedEntries, - originalEntryCount: len(b.queuedEntries), - producerACKs: acks, - doneChan: make(chan struct{}), - } - b.outstandingBatches.add(newBatch) - req.responseChan <- newBatch - - // Unblock any pending requests we can fit into the new batch. - entries := []queueEntry{} - for len(entries) < b.batchSize { - req := b.blockedRequests.next() - if req == nil { - // No more blocked requests - break - } - - entries = append(entries, - queueEntry{event: req.event, producer: req.producer}) - if req.producer != nil { - req.producer.producedCount++ - } - req.responseChan <- true - } - - // Reset the pending entries - b.queuedEntries = entries -} - -// Adds a new request to the end of the current list. -func (b *blockedRequests) add(request *pushRequest) { - blockedReq := &blockedRequest{request: request} - if b.first == nil { - b.first = blockedReq - } else { - b.last.next = blockedReq - } - b.last = blockedReq -} - -// Removes the oldest request from the list and returns it. -func (b *blockedRequests) next() *pushRequest { - var result *pushRequest - if b.first != nil { - result = b.first.request - b.first = b.first.next - if b.first == nil { - b.last = nil - } - } - return result -} diff --git a/libbeat/publisher/queue/proxy/diagrams/broker.d2 b/libbeat/publisher/queue/proxy/diagrams/broker.d2 deleted file mode 100644 index 7b2f1cccb327..000000000000 --- a/libbeat/publisher/queue/proxy/diagrams/broker.d2 +++ /dev/null @@ -1,54 +0,0 @@ -# A diagram of the Beats pipeline and the proxy queue's interaction -# with it. -# To regenerate the image after changing this file, run: -# d2 broker.d2 broker.svg -# To live-edit this file with immediate regeneration of the diagram, run: -# d2 --watch broker.d2 broker.svg - -Input - -producer { - ackHandler -} - -queue: Proxy Queue (broker) { - pushChan - getChan -} - -Input -> producer: Publish -producer.ackHandler -> Input: Event acknowledgment -producer -> queue.pushChan: pushRequest -queue.pushChan -> producer: result - -queueReader { - explanation: |md - `queueReader` is a worker that reads raw batches (satisfying the - `queue.Batch` interface, in this case via `proxyqueue.batch`) - from the queue, wraps them in a `publisher.Batch` interface - (`ttlBatch`) to support pipeline operations like retry/error - handling, and hands them off to `eventConsumer` which - distributes them to output workers. - | -} - -queueReader -> queue.getChan: getRequest -queue.getChan -> queueReader: proxyqueue\.batch - -eventConsumer -> queueReader: queueReaderRequest -queueReader -> eventConsumer: ttlBatch - -eventConsumer { - explanation: |md - `eventConsumer` is a worker that distributes event batches to - (potentially) multiple output workers. When a batch fails, the - output sends it back to `eventConsumer` for redistribution. - | -} - -out1: Output Worker -out2: ... - -eventConsumer -> out1: Publish(ttlBatch) -eventConsumer -> out2 - diff --git a/libbeat/publisher/queue/proxy/diagrams/broker.svg b/libbeat/publisher/queue/proxy/diagrams/broker.svg deleted file mode 100644 index 9e89cf655505..000000000000 --- a/libbeat/publisher/queue/proxy/diagrams/broker.svg +++ /dev/null @@ -1,847 +0,0 @@ - -InputproducerProxy Queue (broker)queueReadereventConsumerOutput Worker...ackHandlerpushChangetChan

queueReader is a worker that reads raw batches (satisfying the
-queue.Batch interface, in this case via proxyqueue.batch)
-from the queue, wraps them in a publisher.Batch interface
-(ttlBatch) to support pipeline operations like retry/error
-handling, and hands them off to eventConsumer which
-distributes them to output workers.

-

eventConsumer is a worker that distributes event batches to
-(potentially) multiple output workers. When a batch fails, the
-output sends it back to eventConsumer for redistribution.

-
PublishEvent acknowledgmentpushRequestresultgetRequestproxyqueue.batchqueueReaderRequestttlBatchPublish(ttlBatch) - - - - - - - - - - -
diff --git a/libbeat/publisher/queue/proxy/internal_api.go b/libbeat/publisher/queue/proxy/internal_api.go deleted file mode 100644 index 8e7e972ac219..000000000000 --- a/libbeat/publisher/queue/proxy/internal_api.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package proxyqueue - -// producer -> broker API - -type pushRequest struct { - event interface{} - producer *producer - - // After receiving a request, the broker will respond on this channel - // with whether the new entry was accepted or not. - responseChan chan bool - - // If canBlock is true, then the broker will store this request until - // either the request can be accepted or the queue itself is closed. - // Otherwise it will immediately reject the requst if there is no - // space in the pending buffer. - canBlock bool -} - -// consumer -> broker API - -type getRequest struct { - responseChan chan *batch // channel to send response to -} diff --git a/libbeat/publisher/queue/proxy/produce.go b/libbeat/publisher/queue/proxy/produce.go deleted file mode 100644 index 87a01450e546..000000000000 --- a/libbeat/publisher/queue/proxy/produce.go +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package proxyqueue - -import ( - "github.com/elastic/beats/v7/libbeat/publisher/queue" -) - -type producer struct { - broker *broker - cancelled bool - // If ackHandler is nil then this producer does not listen to acks. - ackHandler func(count int) - - // producedCount and consumedCount are used to assemble batches and - // should only be accessed by the broker's main loop. - producedCount uint64 - consumedCount uint64 -} - -func newProducer(b *broker, ackHandler func(count int)) queue.Producer { - return &producer{ - broker: b, - ackHandler: ackHandler} -} - -func (p *producer) makePushRequest(event interface{}, canBlock bool) *pushRequest { - req := &pushRequest{ - event: event, - responseChan: make(chan bool, 1), - canBlock: canBlock, - } - if p.ackHandler != nil { - req.producer = p - } - return req -} - -func (p *producer) Publish(event interface{}) (queue.EntryID, bool) { - if p.cancelled { - return 0, false - } - return 0, p.publish(p.makePushRequest(event, true)) -} - -func (p *producer) TryPublish(event interface{}) (queue.EntryID, bool) { - if p.cancelled { - return 0, false - } - return 0, p.publish(p.makePushRequest(event, false)) -} - -func (p *producer) Cancel() int { - p.cancelled = true - return 0 -} - -func (p *producer) publish(req *pushRequest) bool { - select { - case p.broker.pushChan <- req: - return <-req.responseChan - case <-p.broker.doneChan: - // The queue is shutting down - return false - } -} diff --git a/libbeat/publisher/queue/proxy/queue_test.go b/libbeat/publisher/queue/proxy/queue_test.go deleted file mode 100644 index 437216e2d7a5..000000000000 --- a/libbeat/publisher/queue/proxy/queue_test.go +++ /dev/null @@ -1,232 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package proxyqueue - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/elastic-agent-libs/logp" - - "github.com/elastic/beats/v7/libbeat/publisher/queue" -) - -// Because acknowledgments are partially asynchronous (acknowledging -// a batch notifies the queue, which then notifies the original producer -// callback), we can't make a fully deterministic test for ACK counts -// since in principle it depends on the scheduler. -// Nevertheless, in practice the latency should be very low. testACKListener -// is a helper object to track ACK state while allowing for timeouts when -// some propagation delay is unavoidable. -type testACKListener struct { - sync.Mutex - - ackedCount int - - // If not enough ACKs have been received yet, waitForTotalACKs sets - // waiting to true and listens on updateChan. - // If waiting is set when the ACK callback is called, then it sends - // on updateChan to wake up waitForTotalACKs. - waiting bool - updateChan chan struct{} -} - -func TestBasicEventFlow(t *testing.T) { - logger := logp.NewLogger("proxy-queue-tests") - - // Create a proxy queue where each batch is at most 2 events - testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) - defer testQueue.Close() - - listener := newTestACKListener() - producer := testQueue.Producer(queue.ProducerConfig{ - ACK: listener.ACK, - }) - // Try to publish 3 events, only the first two should succeed until we read a batch - _, success := producer.TryPublish(1) - assert.True(t, success) - _, success = producer.TryPublish(2) - assert.True(t, success) - _, success = producer.TryPublish(3) - assert.False(t, success, "Current batch should only fit two events") - - batch, err := testQueue.Get(0) - assert.NoError(t, err, "Should be able to read a batch") - assert.Equal(t, 0, listener.ackedCount, "No batches have been acked yet") - batch.Done() - assert.NoError(t, listener.waitForTotalACKs(2, time.Second)) - - // Make sure that reading an event unblocked the queue - _, success = producer.TryPublish(4) - assert.True(t, success, "Queue should accept incoming event") -} - -func TestBlockedProducers(t *testing.T) { - logger := logp.NewLogger("proxy-queue-tests") - - // Create a proxy queue where each batch is at most 2 events - testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) - defer testQueue.Close() - - listener := newTestACKListener() - - // Create many producer goroutines and send an event through each - // one. Only two events can be in the queue at any one time, so - // the rest of the producers will block until we read enough batches - // from the queue. - const PRODUCER_COUNT = 10 - for i := 0; i < PRODUCER_COUNT; i++ { - go func(producerID int) { - producer := testQueue.Producer(queue.ProducerConfig{ - ACK: listener.ACK, - }) - producer.Publish(producerID) - }(i) - } - - consumedEventCount := 0 - batches := []queue.Batch{} - // First, read all the events. We should be able to do this successfully - // even before any have been acknowledged. - for consumedEventCount < PRODUCER_COUNT { - batch, err := testQueue.Get(0) - assert.NoError(t, err) - consumedEventCount += batch.Count() - batches = append(batches, batch) - } - - assert.Equal(t, 0, listener.ackedCount, "No batches have been acked yet") - for _, batch := range batches { - batch.Done() - } - assert.NoError(t, listener.waitForTotalACKs(PRODUCER_COUNT, time.Second)) -} - -func TestOutOfOrderACK(t *testing.T) { - logger := logp.NewLogger("proxy-queue-tests") - - // Create a proxy queue where each batch is at most 2 events - testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) - defer testQueue.Close() - - listener := newTestACKListener() - producer := testQueue.Producer(queue.ProducerConfig{ - ACK: listener.ACK, - }) - - const BATCH_COUNT = 10 - batches := []queue.Batch{} - for i := 0; i < BATCH_COUNT; i++ { - // Publish two events - _, success := producer.Publish(0) - assert.True(t, success, "Publish should succeed") - _, success = producer.Publish(0) - assert.True(t, success, "Publish should succeed") - - // Consume a batch, which should contain the events we just published - batch, err := testQueue.Get(0) - assert.NoError(t, err) - batch.FreeEntries() - assert.Equal(t, 2, batch.Count()) - - batches = append(batches, batch) - } - - // Acknowledge all except the first batch - for _, batch := range batches[1:] { - batch.Done() - } - // Make sure that no ACKs come in even if we wait a bit - err := listener.waitForTotalACKs(1, 50*time.Millisecond) - assert.Error(t, err, "No ACK callbacks should have been called yet") - - // ACKing the first batch should unblock all the rest - batches[0].Done() - assert.NoError(t, listener.waitForTotalACKs(BATCH_COUNT*2, time.Second)) -} - -func TestWriteAfterClose(t *testing.T) { - logger := logp.NewLogger("proxy-queue-tests") - - testQueue := NewQueue(logger, nil, Settings{BatchSize: 2}) - producer := testQueue.Producer(queue.ProducerConfig{}) - testQueue.Close() - - // Make sure Publish fails instead of blocking - _, success := producer.Publish(1) - assert.False(t, success, "Publish should fail since queue is closed") -} - -func newTestACKListener() *testACKListener { - return &testACKListener{ - updateChan: make(chan struct{}, 1), - } -} - -// ACK should be provided to the queue producer. It can be safely called from -// multiple goroutines. -func (l *testACKListener) ACK(count int) { - l.Lock() - l.ackedCount += count - if l.waiting { - // If waitFortotalACKs is waiting on something, wake it up so it can retry. - l.waiting = false - l.updateChan <- struct{}{} - } - l.Unlock() -} - -// flush should be called on timeout, to clear updateChan if needed. -func (l *testACKListener) flush() { - l.Lock() - select { - case <-l.updateChan: - default: - } - l.waiting = false - l.Unlock() -} - -// waitForTotalACKs waits until the specified number of total ACKs have been -// received, or the timeout interval is exceeded. It should only be called -// from a single goroutine at once. -func (l *testACKListener) waitForTotalACKs(targetCount int, timeout time.Duration) error { - timeoutChan := time.After(timeout) - for { - l.Lock() - if l.ackedCount >= targetCount { - l.Unlock() - return nil - } - // Not enough ACKs have been sent yet, so we have to wait. - l.waiting = true - l.Unlock() - select { - case <-l.updateChan: - // New ACKs came in, retry - continue - case <-timeoutChan: - l.flush() - return fmt.Errorf("timed out waiting for acknowledgments: have %d, wanted %d", l.ackedCount, targetCount) - } - } -}