-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
130 lines (104 loc) · 2.48 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"context"
"fmt"
"math/rand"
"runtime"
"sync"
"time"
)
type subscriber struct {
name string
ch chan int
topic string
close chan struct{}
}
var subscribers = make(map[string][]subscriber)
var mtx sync.Mutex
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
func subscribe(topic string) subscriber {
mtx.Lock()
defer mtx.Unlock()
sub := subscriber{
name: randStringRunes(5),
ch: make(chan int),
topic: topic,
close: make(chan struct{}, 1),
}
subscribers[topic] = append(subscribers[topic], sub)
return sub
}
func tee(ctx context.Context, input int, outputs []subscriber) {
for _, output := range outputs {
go func(output subscriber) {
select {
case <-output.close:
fmt.Printf("channe-%s-%s is closed, you can't send message into it.\n", output.topic, output.name)
return
default:
}
select {
case output.ch <- input:
case <-ctx.Done():
}
}(output)
}
}
func publish(ctx context.Context, topic string, msg int) {
mtx.Lock()
defer mtx.Unlock()
if subs, ok := subscribers[topic]; ok {
if len(subs) != 0 {
tee(ctx, msg, subs)
}
}
}
func unsubscribe(topic string, sub subscriber) {
mtx.Lock()
defer mtx.Unlock()
if subs, ok := subscribers[topic]; ok {
for i, s := range subs {
if s.name == sub.name {
close(s.ch)
s.close <- struct{}{}
newSubs := subs[:i]
newSubs = append(newSubs, subs[i+1:]...)
subscribers[topic] = newSubs
return
}
}
}
}
func printChannel(sub subscriber) {
for data := range sub.ch {
fmt.Printf("subscriber-%s-%s: %d\n", sub.topic, sub.name, data)
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1 := subscribe("name-1")
s2 := subscribe("name-1")
s3 := subscribe("name-2")
go printChannel(s1)
go printChannel(s2)
go printChannel(s3)
publish(ctx, "name-1", 1)
publish(ctx, "name-1", 2)
publish(ctx, "name-2", 3) // not print, because below unsubscribe close the channel before this publish
unsubscribe("name-2", s3)
publish(ctx, "name-2", 333) // not print
//publish(ctx, "name-2", 3) // print
//time.Sleep(500 * time.Millisecond)
//unsubscribe("name-2", s3)
//publish(ctx, "name-2", 333) // not print
time.Sleep(5 * time.Second)
fmt.Printf("expected 3 goroutine, got goroutine: %d\n", runtime.NumGoroutine())
}