Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PriorityQueue function #50

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rill

import (
"github.com/destel/rill/internal/core"
"github.com/destel/rill/internal/heapbuffer"
)

func PriorityQueue[A any](in <-chan Try[A], capacity int, less func(A, A) bool) <-chan Try[A] {
buf := heapbuffer.New(capacity, func(item1, item2 Try[A]) bool {
// Always prioritize errors
if item1.Error != nil {
return true
}
if item2.Error != nil {
return false
}

Check warning on line 16 in buffer.go

View check run for this annotation

Codecov / codecov/patch

buffer.go#L8-L16

Added lines #L8 - L16 were not covered by tests

// invert the comparison to get max-heap behavior
return !less(item1.Value, item2.Value)

Check warning on line 19 in buffer.go

View check run for this annotation

Codecov / codecov/patch

buffer.go#L19

Added line #L19 was not covered by tests
})

return core.CustomBuffer[Try[A]](in, buf)

Check warning on line 22 in buffer.go

View check run for this annotation

Codecov / codecov/patch

buffer.go#L22

Added line #L22 was not covered by tests
}
87 changes: 87 additions & 0 deletions internal/core/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package core

import (
"time"
)

type iBuffer[A any] interface {
Read() A
Peek() A
Write(A)
IsEmpty() bool
IsFull() bool
}

type iShrinkable interface {
Shrink() bool
}

func CustomBuffer[A any](in <-chan A, buf iBuffer[A]) <-chan A {
out := make(chan A)

go func() {
defer close(out)

// Shrinking logic:
// If buffer implements iShrinkable interface, we'll call Shrink() method every 60 seconds
var shrinkChan <-chan time.Time
var shrinkable iShrinkable

if s, ok := buf.(iShrinkable); ok {
shrinkTicker := time.NewTicker(60 * time.Second)
defer shrinkTicker.Stop()

shrinkChan = shrinkTicker.C
shrinkable = s
}

Check warning on line 36 in internal/core/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/core/buffer.go#L31-L36

Added lines #L31 - L36 were not covered by tests

// Main logic.
// High level idea: This is a simple state machine with 4 states that determined by 2 booleans:
// - currentIn == nil
// - currentOut == nil
inClosed := false

for {
currentIn := in
currentOut := out

// Disable reading from the input channel if it's closed or the buffer is full
if inClosed || buf.IsFull() {
currentIn = nil
}

// Disable writing to the output channel if the buffer is empty
var peekedValue A
if buf.IsEmpty() {
currentOut = nil
} else {
peekedValue = buf.Peek()
}

// Exit if there is nothing to do
if currentIn == nil && currentOut == nil {
return
}

// Do read or write operation, whatever is possible
select {
case v, ok := <-currentIn:
if !ok {
inClosed = true
continue
}
buf.Write(v)

case currentOut <- peekedValue:
_ = buf.Read() // discard peeked value

case <-shrinkChan:
_ = shrinkable.Shrink()

Check warning on line 79 in internal/core/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/core/buffer.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

}

}()

return out
}
35 changes: 35 additions & 0 deletions internal/core/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package core

import (
"fmt"
"testing"
"time"

"github.com/destel/rill/internal/heapbuffer"
"github.com/destel/rill/internal/th"
)

func TestCustomBuffer(t *testing.T) {
in := make(chan int)

out := CustomBuffer[int](in, heapbuffer.New(5, func(item1, item2 int) bool {
return item1 > item2
}))

in <- 2
in <- 3
in <- 6
in <- 4
in <- 5

if th.SendTimeout(in, 1*time.Second, 6) {
t.Fatal("SendTimeout should have failed")
}

close(in)

for x := range out {
fmt.Println(x)
}

}
47 changes: 47 additions & 0 deletions internal/heapbuffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package heapbuffer

type Buffer[T any] struct {
heap *Heap[T]
capacity int
}

// New creates a priority queue buffer with the given capacity and less function.
// Non-zero capacity will create a fixed size buffer. Any write operation on a full buffer will panic.
func New[T any](capacity int, less func(item1, item2 T) bool) *Buffer[T] {
h := NewHeap[T](less)
if capacity > 0 {
h.Grow(capacity)
}

Check warning on line 14 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L10-L14

Added lines #L10 - L14 were not covered by tests

return &Buffer[T]{
heap: h,
capacity: capacity,
}

Check warning on line 19 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L16-L19

Added lines #L16 - L19 were not covered by tests
}

func (b *Buffer[T]) IsEmpty() bool {
return b.heap.Len() == 0

Check warning on line 23 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L22-L23

Added lines #L22 - L23 were not covered by tests
}

func (b *Buffer[T]) IsFull() bool {
if b.capacity <= 0 {
return false
}

Check warning on line 29 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L26-L29

Added lines #L26 - L29 were not covered by tests

return b.heap.Len() >= b.capacity

Check warning on line 31 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L31

Added line #L31 was not covered by tests
}

func (b *Buffer[T]) Peek() T {
return b.heap.Peek()

Check warning on line 35 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L34-L35

Added lines #L34 - L35 were not covered by tests
}

func (b *Buffer[T]) Read() T {
return b.heap.Pop()

Check warning on line 39 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L38-L39

Added lines #L38 - L39 were not covered by tests
}

func (b *Buffer[T]) Write(v T) {
if b.IsFull() {
panic("pqbuffer: buffer is full")

Check warning on line 44 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L42-L44

Added lines #L42 - L44 were not covered by tests
}
b.heap.Push(v)

Check warning on line 46 in internal/heapbuffer/buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/heapbuffer/buffer.go#L46

Added line #L46 was not covered by tests
}
137 changes: 137 additions & 0 deletions internal/heapbuffer/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package heapbuffer

// Heap provides a generic min-heap implementation.
// The implementation is inspired by and follows similar patterns to Go's container/heap
// (https://golang.org/src/container/heap/heap.go) but uses generics instead of interfaces
// and adds features like OnIndexChange callback for index tracking.
// Core heap operations (up/down, remove, insert) follow standard textbook algorithms.
type Heap[T any] struct {
data []T

less func(item1, item2 T) bool
OnIndexChange func(T, int)
}

func NewHeap[T any](less func(item1, item2 T) bool) *Heap[T] {
return &Heap[T]{less: less}
}

func (h *Heap[T]) SetData(data []T) {
h.data = data

// heapify
n := h.Len()
for i := n/2 - 1; i >= 0; i-- {
h.down(i, n)
}
}

func (h *Heap[T]) Len() int {
return len(h.data)
}

// Grow grows the heap's capacity, if necessary, to guarantee space for another n items.
func (h *Heap[T]) Grow(n int) {
targetCap := len(h.data) + n
if targetCap > cap(h.data) {
data := make([]T, len(h.data), targetCap)
copy(data, h.data)
h.data = data
}
}

func (h *Heap[T]) Push(item T) {
h.data = append(h.data, item)
n := h.Len() - 1
if h.OnIndexChange != nil {
h.OnIndexChange(item, n)
}
h.up(n)
}

func (h *Heap[T]) Peek() T {
return h.data[0]
}

func (h *Heap[T]) Pop() T {
n := h.Len() - 1
h.swap(0, n)
h.down(0, n)
return h.removeLast()
}

func (h *Heap[T]) Remove(i int) T {
n := h.Len() - 1
if i != n {
h.swap(i, n)
if !h.down(i, n) {
h.up(i)
}
}

return h.removeLast()
}

func (h *Heap[T]) Fix(i int) {
if !h.down(i, h.Len()) {
h.up(i)
}
}

func (h *Heap[T]) up(j int) {
for {
i := (j - 1) / 2 // parent
if i == j || !h.less(h.data[j], h.data[i]) {
break
}

h.swap(i, j)
j = i
}
}

func (h *Heap[T]) down(i0, n int) bool {
i := i0
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && h.less(h.data[j2], h.data[j1]) {
j = j2 // = 2*i + 2 // right child
}
if !h.less(h.data[j], h.data[i]) {
break
}

h.swap(i, j)
i = j
}
return i > i0
}

func (h *Heap[T]) swap(i, j int) {
h.data[i], h.data[j] = h.data[j], h.data[i]

if h.OnIndexChange != nil {
// This is the place where we become a bit less optimal than the container/heap
// We do 2 function calls while container/heap does 1
h.OnIndexChange(h.data[i], i)
h.OnIndexChange(h.data[j], j)
}
}

func (h *Heap[T]) removeLast() T {
var zero T

n := h.Len() - 1
res := h.data[n]
h.data[n] = zero // for GC
h.data = h.data[:n]

if h.OnIndexChange != nil {
h.OnIndexChange(res, -1) // for safety
}
return res
}
Loading
Loading