forked from nsqio/go-nsq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
doc.go
88 lines (68 loc) · 2.33 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/*
Package nsq is the official Go package for NSQ (http://nsq.io/).
It provides high-level Consumer and Producer types as well as low-level
functions to communicate over the NSQ protocol.
Consumer
Consuming messages from NSQ can be done by creating an instance of a Consumer and supplying it a handler.
package main
import (
"log"
"os/signal"
"github.com/nsqio/go-nsq"
)
type myMessageHandler struct {}
// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
// do whatever actual message processing is desired
err := processMessage(m.Body)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return err
}
func main() {
// Instantiate a consumer that will subscribe to the provided channel.
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("topic", "channel", config)
if err != nil {
log.Fatal(err)
}
// Set the Handler for messages received by this Consumer. Can be called multiple times.
// See also AddConcurrentHandlers.
consumer.AddHandler(&myMessageHandler{})
// Use nsqlookupd to discover nsqd instances.
// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
err = consumer.ConnectToNSQLookupd("localhost:4161")
if err != nil {
log.Fatal(err)
}
// wait for signal to exit
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// Gracefully stop the consumer.
consumer.Stop()
}
Producer
Producing messages can be done by creating an instance of a Producer.
// Instantiate a producer.
config := nsq.NewConfig()
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello")
topicName := "topic"
// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err = producer.Publish(topicName, messageBody)
if err != nil {
log.Fatal(err)
}
// Gracefully stop the producer when appropriate (e.g. before shutting down the service)
producer.Stop()
*/
package nsq