Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
Merge pull request #61 from roadrunner-server/feature/priority_queue_…
Browse files Browse the repository at this point in the history
…remove

feature(priority queue): `Remove` method with monotonic stack
  • Loading branch information
rustatian authored Feb 15, 2023
2 parents 88e0d05 + 1fb3576 commit 1e145c5
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 75 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ jobs:
strategy:
fail-fast: false
matrix:
php: ["8.1"]
go: ["1.19"]
php: ["8.2"]
go: ["1.20"]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2020 Spiral Scout
Copyright (c) 2023 Spiral Scout

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
43 changes: 43 additions & 0 deletions priority_queue/binary_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import (
type Item interface {
// Priority returns the Item's priority to sort
Priority() int64
// ID of the item
ID() string
}

type BinHeap[T Item] struct {
items []T
st *stack
// find a way to use pointer to the raw data
len uint64
maxLen uint64
Expand All @@ -26,6 +29,7 @@ type BinHeap[T Item] struct {
func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] {
return &BinHeap[T]{
items: make([]T, 0, 1000),
st: newStack(),
len: 0,
maxLen: maxLen,
cond: sync.Cond{L: &sync.Mutex{}},
Expand Down Expand Up @@ -75,6 +79,45 @@ func (bh *BinHeap[T]) fixDown(curr, end int) {
}
}

// Remove removes all elements with the provided ID and returns the slice with them
func (bh *BinHeap[T]) Remove(id string) []T {
bh.cond.L.Lock()
defer bh.cond.L.Unlock()

out := make([]T, 0, 10)

for i := 0; i < len(bh.items); i++ {
if bh.items[i].ID() == id {
out = append(out, bh.items[i])
bh.st.add(i)
}
}

/*
for i:=0; i<len(ids); i++ {
interval := ids[i]
interval[0] - beginning of the interval
interval[1] - end of the interval
bh.items = append(bh.items[:interval[0]], bh.items[interval[1]+1:]...)
}
*/

ids := bh.st.indices()
adjusment := 0
for i := 0; i < len(ids); i++ {
start := ids[i][0] - adjusment
end := ids[i][1] - adjusment

bh.items = append(bh.items[:start], bh.items[end+1:]...)
adjusment += end - start + 1
}

atomic.StoreUint64(&bh.len, uint64(len(bh.items)))
bh.st.clear()

return out
}

// PeekPriority returns the highest priority
func (bh *BinHeap[T]) PeekPriority() int64 {
bh.cond.L.Lock()
Expand Down
198 changes: 126 additions & 72 deletions priority_queue/binary_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type Test struct {
priority int64
id string
}

func NewTest(priority int64) Test {
func NewTest(priority int64, id string) Test {
return Test{
priority: priority,
id: id,
}
}

Expand All @@ -31,7 +34,7 @@ func (t Test) Context() ([]byte, error) {
}

func (t Test) ID() string {
return "none"
return t.id
}

func (t Test) Priority() int64 {
Expand All @@ -40,17 +43,17 @@ func (t Test) Priority() int64 {

func TestBinHeap_Init(t *testing.T) {
a := []Item{
NewTest(2),
NewTest(23),
NewTest(33),
NewTest(44),
NewTest(1),
NewTest(2),
NewTest(2),
NewTest(2),
NewTest(4),
NewTest(6),
NewTest(99),
NewTest(2, uuid.NewString()),
NewTest(23, uuid.NewString()),
NewTest(33, uuid.NewString()),
NewTest(44, uuid.NewString()),
NewTest(1, uuid.NewString()),
NewTest(2, uuid.NewString()),
NewTest(2, uuid.NewString()),
NewTest(2, uuid.NewString()),
NewTest(4, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(99, uuid.NewString()),
}

bh := NewBinHeap[Item](12)
Expand All @@ -59,43 +62,44 @@ func TestBinHeap_Init(t *testing.T) {
bh.Insert(a[i])
}

expected := []Item{
NewTest(1),
NewTest(2),
NewTest(2),
NewTest(2),
NewTest(2),
NewTest(4),
NewTest(6),
NewTest(23),
NewTest(33),
NewTest(44),
NewTest(99),
expected := []int64{
1,
2,
2,
2,
2,
4,
6,
23,
33,
44,
99,
}

res := make([]Item, 0, 12)
res := make([]int64, 0, 12)

for i := 0; i < 11; i++ {
item := bh.ExtractMin()
res = append(res, item)
item.Priority()
res = append(res, item.Priority())
}

require.Equal(t, expected, res)
}

func TestBinHeap_MaxLen(t *testing.T) {
a := []Item{
NewTest(2),
NewTest(23),
NewTest(33),
NewTest(44),
NewTest(1),
NewTest(2),
NewTest(2),
NewTest(2),
NewTest(4),
NewTest(6),
NewTest(99),
NewTest(2, uuid.NewString()),
NewTest(23, uuid.NewString()),
NewTest(33, uuid.NewString()),
NewTest(44, uuid.NewString()),
NewTest(1, uuid.NewString()),
NewTest(2, uuid.NewString()),
NewTest(2, uuid.NewString()),
NewTest(2, uuid.NewString()),
NewTest(4, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(99, uuid.NewString()),
}

bh := NewBinHeap[Item](1)
Expand Down Expand Up @@ -171,7 +175,7 @@ func TestNewPriorityQueue(t *testing.T) {
case <-stopCh:
return
default:
pq.Insert(NewTest(rand.Int63())) //nolint:gosec
pq.Insert(NewTest(rand.Int63(), uuid.NewString())) //nolint:gosec
atomic.AddUint64(&insertsPerSec, 1)
}
}
Expand All @@ -186,17 +190,17 @@ func TestNewPriorityQueue(t *testing.T) {

func TestNewItemWithTimeout(t *testing.T) {
a := []Item{
NewTest(5),
NewTest(23),
NewTest(33),
NewTest(44),
NewTest(5),
NewTest(5),
NewTest(6),
NewTest(7),
NewTest(8),
NewTest(6),
NewTest(99),
NewTest(5, uuid.NewString()),
NewTest(23, uuid.NewString()),
NewTest(33, uuid.NewString()),
NewTest(44, uuid.NewString()),
NewTest(5, uuid.NewString()),
NewTest(5, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(7, uuid.NewString()),
NewTest(8, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(99, uuid.NewString()),
}

/*
Expand All @@ -217,17 +221,17 @@ func TestNewItemWithTimeout(t *testing.T) {

func TestItemPeek(t *testing.T) {
a := []Item{
NewTest(5),
NewTest(23),
NewTest(33),
NewTest(44),
NewTest(5),
NewTest(5),
NewTest(6),
NewTest(7),
NewTest(8),
NewTest(6),
NewTest(99),
NewTest(5, uuid.NewString()),
NewTest(23, uuid.NewString()),
NewTest(33, uuid.NewString()),
NewTest(44, uuid.NewString()),
NewTest(5, uuid.NewString()),
NewTest(5, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(7, uuid.NewString()),
NewTest(8, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(99, uuid.NewString()),
}

/*
Expand All @@ -251,17 +255,17 @@ func TestItemPeek(t *testing.T) {

func TestItemPeekConcurrent(t *testing.T) {
a := []Item{
NewTest(5),
NewTest(23),
NewTest(33),
NewTest(44),
NewTest(5),
NewTest(5),
NewTest(6),
NewTest(7),
NewTest(8),
NewTest(6),
NewTest(99),
NewTest(5, uuid.NewString()),
NewTest(23, uuid.NewString()),
NewTest(33, uuid.NewString()),
NewTest(44, uuid.NewString()),
NewTest(5, uuid.NewString()),
NewTest(5, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(7, uuid.NewString()),
NewTest(8, uuid.NewString()),
NewTest(6, uuid.NewString()),
NewTest(99, uuid.NewString()),
}

/*
Expand Down Expand Up @@ -294,3 +298,53 @@ func TestItemPeekConcurrent(t *testing.T) {

wg.Wait()
}

func TestBinHeap_Remove(t *testing.T) {
a := []Item{
NewTest(2, "1"),
NewTest(5, "1"),
NewTest(99, "1"),
NewTest(4, "6"),
NewTest(6, "7"),
NewTest(23, "2"),
NewTest(2, "1"),
NewTest(2, "1"),
NewTest(33, "3"),
NewTest(44, "4"),
NewTest(2, "1"),
}

bh := NewBinHeap[Item](12)

for i := 0; i < len(a); i++ {
bh.Insert(a[i])
}

expected := []Item{
NewTest(4, "6"),
NewTest(6, "7"),
NewTest(23, "2"),
NewTest(33, "3"),
NewTest(44, "4"),
}

out := bh.Remove("1")
if len(out) != 6 {
t.Fatal("should be 5")
}

for i := 0; i < len(out); i++ {
if out[i].ID() != "1" {
t.Fatal("id is not 1")
}
}

res := make([]Item, 0, 12)

for i := 0; i < 5; i++ {
item := bh.ExtractMin()
res = append(res, item)
}

require.Equal(t, expected, res)
}
Loading

0 comments on commit 1e145c5

Please sign in to comment.