-
Notifications
You must be signed in to change notification settings - Fork 25
/
main.go
126 lines (105 loc) · 3.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"context"
"flag"
"fmt"
"os"
"runtime"
"strings"
"cloud.google.com/go/pubsub"
)
var (
debug = flag.Bool("debug", false, "Enable debug logging")
help = flag.Bool("help", false, "Display usage information")
version = flag.Bool("version", false, "Display version information")
)
// The CommitHash and Revision variables are set during building.
var (
CommitHash = "<not set>"
Revision = "<not set>"
)
// Topics describes a PubSub topic and its subscriptions.
type Topics map[string][]string
func versionString() string {
return fmt.Sprintf("pubsubc - build %s (%s) running on %s", Revision, CommitHash, runtime.Version())
}
// debugf prints debugging information.
func debugf(format string, params ...interface{}) {
if *debug {
fmt.Printf(format+"\n", params...)
}
}
// fatalf prints an error to stderr and exits.
func fatalf(format string, params ...interface{}) {
fmt.Fprintf(os.Stderr, os.Args[0]+": "+format+"\n", params...)
os.Exit(1)
}
// create a connection to the PubSub service and create topics and subscriptions
// for the specified project ID.
func create(ctx context.Context, projectID string, topics Topics) error {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("Unable to create client to project %q: %s", projectID, err)
}
defer client.Close()
debugf("Client connected with project ID %q", projectID)
for topicID, subscriptions := range topics {
debugf(" Creating topic %q", topicID)
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
return fmt.Errorf("Unable to create topic %q for project %q: %s", topicID, projectID, err)
}
for _, subscriptionID := range subscriptions {
debugf(" Creating subscription %q", subscriptionID)
_, err = client.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
return fmt.Errorf("Unable to create subscription %q on topic %q for project %q: %s", subscriptionID, topicID, projectID, err)
}
}
}
return nil
}
func main() {
flag.Parse()
flag.Usage = func() {
fmt.Printf(`Usage: env PUBSUB_PROJECT1="project1,topic1,topic2:subscription1" %s`+"\n", os.Args[0])
flag.PrintDefaults()
}
if *help {
flag.Usage()
return
}
if *version {
fmt.Println(versionString())
return
}
// Cycle over the numbered PUBSUB_PROJECT environment variables.
for i := 1; ; i++ {
// Fetch the enviroment variable. If it doesn't exist, break out.
currentEnv := fmt.Sprintf("PUBSUB_PROJECT%d", i)
env := os.Getenv(currentEnv)
if env == "" {
// If this is the first environment variable, print the usage info.
if i == 1 {
flag.Usage()
os.Exit(1)
}
break
}
// Separate the projectID from the topic definitions.
parts := strings.Split(env, ",")
if len(parts) < 2 {
fatalf("%s: Expected at least 1 topic to be defined", currentEnv)
}
// Separate the topicID from the subscription IDs.
topics := make(Topics)
for _, part := range parts[1:] {
topicParts := strings.Split(part, ":")
topics[topicParts[0]] = topicParts[1:]
}
// Create the project and all its topics and subscriptions.
if err := create(context.Background(), parts[0], topics); err != nil {
fatalf(err.Error())
}
}
}