forked from stith/gorelp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwindow.go
132 lines (102 loc) · 2.15 KB
/
window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package relp
import (
"sync"
"fmt"
"errors"
)
type Txn int32
type Window interface {
Add(Txn)
Try(Txn) error
Remove(Txn) error
Remaining() int
Outstanding() int
Size() int
HighestAcked() Txn
HighestSent() Txn
Close() error
}
type empty struct{}
type semaphore chan empty
type ArrayWindow struct {
size int
highestAcked Txn
highestSent Txn
mutex *sync.Mutex
outstanding map[Txn]bool
semaphore semaphore
}
func NewArrayWindow(size int) Window {
if size <= 0 {
panic("Size must be >= 1")
}
return &ArrayWindow{
size: size,
highestAcked: 0,
highestSent: 0,
mutex: &sync.Mutex{},
outstanding: make(map[Txn]bool),
semaphore: make(semaphore, size),
}
}
func (w *ArrayWindow) Add(txn Txn) {
// wait until there's a free slot
w.semaphore <- empty{}
// prepare to mutate
w.mutex.Lock()
defer w.mutex.Unlock()
if len(w.outstanding) > w.size {
panic("w.outstanding should be < size in Add")
}
// client code should not reuse or resend a given message ID
if _, present := w.outstanding[txn]; present {
panic(fmt.Sprint("txnid already sent! ", txn))
}
w.outstanding[txn] = true
// TODO: handle wraparound per spec at 999,999,999
if txn > w.highestSent {
w.highestSent = txn
}
}
func (w *ArrayWindow) Try(txn Txn) error {
panic("not impl")
}
func (w *ArrayWindow) Remove(txn Txn) error {
w.mutex.Lock()
defer w.mutex.Unlock()
if _, present := w.outstanding[txn]; !present {
return errors.New(fmt.Sprint("txn not presentin window ", txn))
}
if txn > w.highestAcked {
w.highestAcked = txn
}
delete(w.outstanding, txn)
// make space for another outstanding message
_ = <- w.semaphore
return nil
}
func (w *ArrayWindow) Size() int {
return w.size
}
func (w *ArrayWindow) Remaining() int {
return w.size - w.Outstanding()
}
func (w *ArrayWindow) _outstanding() int {
return len(w.outstanding)
}
func (w *ArrayWindow) Outstanding() int {
w.mutex.Lock()
res := w._outstanding()
w.mutex.Unlock()
return res
}
func (w *ArrayWindow) HighestAcked() Txn {
return w.highestAcked
}
func (w *ArrayWindow) HighestSent() Txn {
return w.highestSent
}
func (w *ArrayWindow) Close() error {
close(w.semaphore)
return nil
}