Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework with a better implementation #5

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- 1.5.1
- 1.x

notifications:
email:
Expand All @@ -10,11 +10,17 @@ notifications:
on_success: change
on_failure: change

env:
global:
- GO111MODULE=on
#COVERALLS_TOKEN
- secure: "Sem13/2A1jUUMrhgC5w8oPaVVKU0iPs7LZ1QqlHjSpCX03WcEvhwUloXoFtKqMjoe1hHRplUE5rO/gquQa+t34i9PSVp00qiLO5eec4M0rJJ/Uz1PG8BEfAlEaiuSZTQ+JM4pNvldc2gbmYQ+dz7Ans5lA0Hv5j5L7A9///O+IJ/3Ob6yhG9YGXx/ohrF0US6PkHwl6+8Vu9iMOLBpnxeV1pRPny5yjVbtvTmAEnQc6ZMLMX0LgwcIuTJIvS8JTJgqyO6vygh0ZxEzmCB1gqf4h4ZkxTn7wxlY9UaH2P7F0agInZLP+Pt6Opg/YvIVreh9gOGy1u6utN8CGkUstkz9W8zuPpxV0PGdhhR7nyWoj1072qPYTk/m/ED5flKsmlW8tfq0HbgcpumWoZov/UoECgWltT9nQb8XVHXMQApaNnhh/EThlC0/noNfehtJsT/zNEK6fSZK+ZHOqyxalViZsEznTbo7RzpP1qD59+dBWx544PCAghSemLg+XYoEJALn0SocSVpYiW+F1cHhxL4c0imMhHGzzk/id5ZAJ3RLFzOzH8Ker1Ssp8TkCqlHoksv/IkkEW0IQ3O7vfrNPjBCcI2foAPfdv3EivfuI3L5JPgiteQ8LyZfzCO2xTGnCqUoBBcIiM6471aaQtADdnZMcwW3tAzdgrSgXrTWGcobI="

install:
- ./bin/travis/install.sh
- ./bin/travis/install

script:
- ./bin/travis/test_coverage.sh
- ./bin/travis/test-coverage

after_success:
- ./bin/coveralls/push.sh
- ./bin/coveralls/push
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# timequeue
timequeue provides a TimeQueue type that releases arbitrary messages at given
timequeue provides a TimeQueue type that releases arbitrary Messages at given
time.Times.

#### Status
[![Build Status](https://travis-ci.org/gogolfing/timequeue.svg?branch=master)](https://travis-ci.org/gogolfing/timequeue)
[![Coverage Status](https://coveralls.io/repos/github/gogolfing/timequeue/badge.svg?branch=master)](https://coveralls.io/github/gogolfing/timequeue?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/gogolfing/timequeue)](https://goreportcard.com/report/github.com/gogolfing/timequeue)

### Documentation and Usage
Full documentation and examples can be found at [![GoDoc](https://godoc.org/github.com/gogolfing/timequeue?status.svg)](https://godoc.org/github.com/gogolfing/timequeue)
37 changes: 0 additions & 37 deletions bin/bump_version.sh

This file was deleted.

File renamed without changes.
3 changes: 2 additions & 1 deletion bin/travis/install.sh → bin/travis/install
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

go get github.com/axw/gocov/gocov
go get github.com/ericelsken/goveralls
go get -t ./...

go get ./...
File renamed without changes.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/gogolfing/timequeue

go 1.12
228 changes: 127 additions & 101 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,143 +2,169 @@ package timequeue

import (
"container/heap"
"fmt"
"time"
)

//sentinel value that says a Message is not in a messageHeap.
const notInIndex = -1
const (
//indexNotInHeap is a sentinel for a Message.index that indicates the Message
//is not a in a messageHeap.
indexNotInHeap = -1
)

//Message is a simple holder struct for a time.Time (the time the Message
//will be released from the queue) and a Data payload of type interface{}.
//
//A Message is not safe for modification from multiple go-routines.
//The Time field is used to calculate when the Message should be released from
//a TimeQueue, and thus changing its value while the Message is still referenced
//by a TimeQueue could have unknown side-effects.
//The Data field is never modified by a TimeQueue.
//Message is a container type that associates a Time with some
//arbitrary data.
//A Message is "released" from a TimeQueue as close to Time At as possible.
//
//It is up to client code to ensure that Data is always of the same underlying
//type if that is desired.
//Message zero values are not in a valid state. You should use NewMessage to create
//Message instances.
type Message struct {
time.Time
Data interface{}
//at is the Time at which to release this Message.
at time.Time

//data is any arbitrary data that you can put in a Message and retrieve when
//the Message is released.
data interface{}

//reference to the messageHeap that this Message is in. used for removal safety.
mh *messageHeap
//the index of this Message in mh. used to remove a Message from a messageHeap.
//messageHeap is the messageHeap that this Message is in.
//A nil value means that Message is not in a messageHeap.
*messageHeap

//index is the index at which this Message resides in messageHeap.
index int
}

//String returns the standard string representation of a struct.
func (m *Message) String() string {
return fmt.Sprintf("&timequeue.Message{%v %v}", m.Time, m.Data)
//NewMessage returns a Message with at and data set on their corresponding fields.
//
//You should use this function to create Messages instead of using a struct initializer.
func NewMessage(at time.Time, data interface{}) *Message {
return &Message{
at: at,
data: data,
messageHeap: nil,
index: indexNotInHeap,
}
}

//messageHeap is a heap.Interface implementation for Messages.
//The peekMessage(), pushMessage(), popMessage(), and removeMessage() methods
//should be used over Push() and Pop() because they provide logic for emprty heaps,
//nil Messages, and removal.
//A messageHeap has no guarantees for correctness if they are not used.
//messageHeap is not safe for use by multiple go-routines.
type messageHeap struct {
messages []*Message
//At returns the Time at which m is scheduled to be released.
func (m *Message) At() time.Time {
return m.at
}

//newMessageHeap creates a messageHeap with messages added to the heap.
//heap.Init() is called before the value is returned.
func newMessageHeap() *messageHeap {
mh := &messageHeap{
messages: []*Message{},
}
heap.Init(mh)
return mh
//Data returns the data associated with m.
//
//This will usually be used after receiving a Message from a TimeQueue in order
//to process the Message appropriately.
func (m *Message) Data() interface{} {
return m.data
}

//Len returns the number of Messages in the heap.
func (mh *messageHeap) Len() int {
return len(mh.messages)
//less returns whether or not m is "less than" other.
//This is used to determined the order in which Messages are released from a TimeQueue.
//
//It returns true if m.at is before other.at.
func (m *Message) less(other *Message) bool {
return m.at.Before(other.at)
}

//Less determines whether or not the Message at index i is less than that at index
//j.
//This is determined by the (message at i.Time).Before(message at j.Time).
func (mh *messageHeap) Less(i, j int) bool {
return mh.messages[i].Time.Before(mh.messages[j].Time)
//isHead returns whether or not m is at the head of a messageHeap, i.e. the next
//one to be released.
func (m *Message) isHead() bool {
return m.messageHeap != nil && m.index == 0
}

//Swap swaps the messages at indices i and j.
func (mh *messageHeap) Swap(i, j int) {
mh.messages[i], mh.messages[j] = mh.messages[j], mh.messages[i]
mh.messages[i].index = i
mh.messages[j].index = j
func (m *Message) withoutHeap() Message {
m.messageHeap = nil
m.index = indexNotInHeap
return *m
}

//Push is the heap.Interface Push method that adds value to the heap.
//Appends value to the internal slice.
func (mh *messageHeap) Push(value interface{}) {
mh.messages = append(mh.messages, value.(*Message))
//messageHeap is a slice of Messages with methods that satisfy the heap.Interface.
//
//messageHeaps can be used with the heap package to push and pop Messages ordered
//by Message.less.
//
//messageHeaps are not safe for use by multiple goroutines.
//
//We let Go manage how increasing size and capacity works when appending to a
//messageHeap.
type messageHeap []*Message

//Len is the heap.Interface implementation.
//It returns len(mh).
func (mh messageHeap) Len() int {
return len(mh)
}

//Pop is the heap.Interface Pop method that removes the "smallest" Message from the heap.
func (mh *messageHeap) Pop() interface{} {
n := len(mh.messages)
result := (mh.messages)[n-1]
mh.messages = (mh.messages)[0 : n-1]
return result
//Less is the heap.Interface implementation.
func (mh messageHeap) Less(i, j int) bool {
return mh[i].less(mh[j])
}

//peekMessage returns the "smallest" Message in the heap (without removal) or
//nil if the heap is empty.
func (mh *messageHeap) peekMessage() *Message {
if mh.Len() > 0 {
return mh.messages[0]
}
return nil
//Swap is the heap.Interface implementation.
func (mh messageHeap) Swap(i, j int) {
mh[i], mh[j] = mh[j], mh[i]
mh[i].index = i
mh[j].index = j
}

//pushMessageValues creates and adds a Message with values t and data in the
//appropriate index to mh.
//The created message is returned.
func (mh *messageHeap) pushMessageValues(t time.Time, data interface{}) *Message {
message := &Message{
Time: t,
Data: data,
index: mh.Len(),
mh: mh,
}
heap.Push(mh, message)
return message
//pushMessage is a helper that calls the heap.Push package function with mh and m.
func pushMessage(mh *messageHeap, m *Message) {
heap.Push(mh, m)
}

//popMessage returns the "smallest" Message in the heap (after removal) or nil
//if the heap is empty.
func (mh *messageHeap) popMessage() *Message {
if mh.Len() == 0 {
return nil
//Push is the heap.Interface implementation.
func (mh *messageHeap) Push(x interface{}) {
n := len(*mh)
m := x.(*Message)
m.messageHeap, m.index = mh, n
*mh = append(*mh, m)
}

//popMessage is a helper that calls the heap.Pop package function with mh.
func popMessage(mh *messageHeap) *Message {
return heap.Pop(mh).(*Message)
}

//Pop is the heap.Interface implementation.
func (mh *messageHeap) Pop() interface{} {
old := *mh
n := len(old)
m := old[n-1]
m.messageHeap, m.index = nil, indexNotInHeap
*mh = old[0 : n-1]
return m
}

//peek returns the next Message to be released, or nil if mh is empty.
func (mh *messageHeap) peek() *Message {
if mh.Len() > 0 {
return (*mh)[0]
}
result := heap.Pop(mh).(*Message)
beforeRemoval(result)
return result
return nil
}

//removeMessage removes the message from mh.
//If mh is empty, message is nil, or message is not in mh, then this is a nop
//and returns false.
//Returns true or false indicating whether or not message was actually removed
//from mh.
func (mh *messageHeap) removeMessage(message *Message) bool {
if mh.Len() == 0 || message == nil || message.index == notInIndex || message.mh != mh {
//remove attemps to remove m from mh.
//
//It returns true if m is actually stored in mh and was actually removed, false
//if m is not in mh.
func (mh *messageHeap) remove(m *Message) bool {
if m.messageHeap != mh {
return false
}
result := heap.Remove(mh, message.index).(*Message)
beforeRemoval(result)

heap.Remove(mh, m.index)
return true
}

//beforeRemoval sets the index and mh fields of message to indicate that it is
//no longer in a messageHeap.
func beforeRemoval(message *Message) {
message.index = notInIndex
message.mh = nil
func (mh *messageHeap) drain() []Message {
old := *mh

result := make([]Message, len(old))
for i, m := range old {
result[i] = m.withoutHeap()
}

*mh = old[0:0]

return result
}
Loading