Skip to content

Commit e9e7de0

Browse files
authored
do not close already closed channel (#11)
1 parent a9ffe31 commit e9e7de0

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

opwindow.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
6767
item, ok := q.m[id]
6868
if ok {
6969
if len(item.OpSet.set) >= q.width {
70-
close(item.IsFull)
70+
if !item.IsFullClosed {
71+
close(item.IsFull)
72+
item.IsFullClosed = true
73+
}
7174
q.mu.Unlock()
7275
return ErrQueueSaturatedWidth
7376
}
@@ -163,8 +166,9 @@ func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {
163166
}
164167

165168
type queueItem struct {
166-
ID ID
167-
ProcessAt time.Time
168-
OpSet *OpSet
169-
IsFull chan struct{}
169+
ID ID
170+
ProcessAt time.Time
171+
OpSet *OpSet
172+
IsFull chan struct{}
173+
IsFullClosed bool
170174
}

0 commit comments

Comments
 (0)