Skip to content

Commit

Permalink
Properly closing serial & database connections
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Oct 2, 2023
1 parent ae2217a commit 6391a60
Show file tree
Hide file tree
Showing 21 changed files with 175 additions and 108 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2.1.9p
v2.1.10p
4 changes: 2 additions & 2 deletions build/assets/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"station_settings": {
"uuid": "a373e39c-8e15-44ae-a1ad-6fb622bc49e6",
"name": "测试测站",
"name": "Test Station",
"latitude": 1.0,
"longitude": 1.0,
"altitude": 0
Expand Down Expand Up @@ -59,6 +59,6 @@
"enable": false,
"path": "/data/miniseed",
"station": "TEST",
"network": "CN"
"network": "XX"
}
}
27 changes: 20 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package main
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/fatih/color"

Expand Down Expand Up @@ -32,7 +35,7 @@ func parseCommandLine(conf *config.Conf) error {
return err
}

logger.Print("main", "Observer daemon initialized", color.FgMagenta)
logger.Print("main", "main daemon has initialized", color.FgMagenta, false)
return nil
}

Expand All @@ -58,22 +61,23 @@ func main() {
publisher.Initialize(&conf, &status)

// Register features
featureOptions := &feature.FeatureOptions{
Config: &conf,
Status: &status,
}
features := []feature.Feature{
&ntpclient.NTPClient{},
&geophone.Geophone{},
&archiver.Archiver{},
&miniseed.MiniSEED{},
}
featureOptions := &feature.FeatureOptions{
Config: &conf,
Status: &status,
}
featureWaitGroup := new(sync.WaitGroup)
for _, s := range features {
go s.Start(featureOptions)
go s.Run(featureOptions, featureWaitGroup)
}

// Start HTTP server
server.ServerDaemon(
go server.StartDaemon(
conf.Server.Host,
conf.Server.Port,
&app.ServerOptions{
Expand All @@ -84,4 +88,13 @@ func main() {
FeatureOptions: featureOptions,
CORS: conf.Server.CORS,
})

// Receive interrupt signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh

// Wait for all features to stop
logger.Print("main", "main daemon is shutting down", color.FgMagenta, true)
featureWaitGroup.Wait()
}
8 changes: 4 additions & 4 deletions feature/archiver/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (
)

func (a *Archiver) OnStart(options *feature.FeatureOptions, v ...any) {
logger.Print(MODULE, text.Concat(v...), color.FgMagenta)
logger.Print(MODULE, text.Concat(v...), color.FgMagenta, false)
}

func (a *Archiver) OnStop(options *feature.FeatureOptions, v ...any) {
logger.Print(MODULE, text.Concat(v...), color.FgBlue)
logger.Print(MODULE, text.Concat(v...), color.FgBlue, false)
}

func (a *Archiver) OnReady(options *feature.FeatureOptions, v ...any) {
logger.Print(MODULE, "1 message has been archived", color.FgGreen)
logger.Print(MODULE, "1 message has been archived", color.FgGreen, false)
}

func (a *Archiver) OnError(options *feature.FeatureOptions, err error) {
postgres.Close(options.Database)
logger.Print(MODULE, err, color.FgRed)
logger.Print(MODULE, err, color.FgRed, false)
}
83 changes: 52 additions & 31 deletions feature/archiver/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ package archiver
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/bclswl0827/observer/driver/postgres"
"github.com/bclswl0827/observer/feature"
"github.com/bclswl0827/observer/publisher"
"github.com/bclswl0827/observer/utils/logger"
"github.com/fatih/color"
)

func (a *Archiver) Start(options *feature.FeatureOptions) {
func (a *Archiver) Run(options *feature.FeatureOptions, waitGroup *sync.WaitGroup) {
if !options.Config.Archiver.Enable {
a.OnStop(options, "service is disabled")
return
} else {
waitGroup.Add(1)
defer waitGroup.Done()
}

// Connect to PostgreSQL
a.OnStart(options, "service has started")
pdb, err := postgres.Open(
options.Config.Archiver.Host,
Expand All @@ -28,6 +37,7 @@ func (a *Archiver) Start(options *feature.FeatureOptions) {
os.Exit(1)
}

// Initialize PostgreSQL
err = postgres.Init(pdb)
if err != nil {
a.OnError(options, err)
Expand All @@ -36,40 +46,51 @@ func (a *Archiver) Start(options *feature.FeatureOptions) {
options.Database = pdb

// Archive when new message arrived
publisher.Subscribe(
&options.Status.Geophone,
func(gp *publisher.Geophone) error {
var (
ts = gp.TS
ehz = gp.EHZ
ehe = gp.EHE
ehn = gp.EHN
)
err := postgres.Insert(pdb, ts, ehz, ehe, ehn)
if err != nil {
a.OnError(options, err)
postgres.Close(pdb)

// Reconnect to PostgreSQL
pdb, err := postgres.Open(
options.Config.Archiver.Host,
options.Config.Archiver.Port,
options.Config.Archiver.Username,
options.Config.Archiver.Password,
options.Config.Archiver.Database,
go func() {
publisher.Subscribe(
&options.Status.Geophone,
func(gp *publisher.Geophone) error {
var (
ts = gp.TS
ehz = gp.EHZ
ehe = gp.EHE
ehn = gp.EHN
)
err := postgres.Insert(pdb, ts, ehz, ehe, ehn)
if err != nil {
a.OnError(options, err)
return err
postgres.Close(pdb)

// Reconnect to PostgreSQL
pdb, err := postgres.Open(
options.Config.Archiver.Host,
options.Config.Archiver.Port,
options.Config.Archiver.Username,
options.Config.Archiver.Password,
options.Config.Archiver.Database,
)
if err != nil {
a.OnError(options, err)
return err
}
options.Database = pdb
}
options.Database = pdb
}

a.OnReady(options)
return nil
},
)
a.OnReady(options)
return nil
},
)

err = fmt.Errorf("service exited with an error")
a.OnError(options, err)
}()

// Receive interrupt signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)

err = fmt.Errorf("service exited with a error")
a.OnError(options, err)
// Wait for interrupt signals
<-sigCh
logger.Print(MODULE, "closing database connection", color.FgBlue, true)
postgres.Close(pdb)
}
10 changes: 5 additions & 5 deletions feature/geophone/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
)

func (g *Geophone) OnStart(options *feature.FeatureOptions, v ...any) {
logger.Print(MODULE, text.Concat(v...), color.FgMagenta)
logger.Print(MODULE, text.Concat(v...), color.FgMagenta, false)
}

func (g *Geophone) OnStop(options *feature.FeatureOptions, v ...any) {
logger.Print(MODULE, text.Concat(v...), color.FgBlue)
logger.Print(MODULE, text.Concat(v...), color.FgBlue, true)
}

func (g *Geophone) OnReady(options *feature.FeatureOptions, v ...any) {
Expand Down Expand Up @@ -74,14 +74,14 @@ func (g *Geophone) OnReady(options *feature.FeatureOptions, v ...any) {
options.Status.Buffer.EHZ = []int32{}
options.Status.Buffer.EHE = []int32{}
options.Status.Buffer.EHN = []int32{}
logger.Print(MODULE, "1 full packet received", color.FgGreen)
logger.Print(MODULE, "1 full packet received", color.FgGreen, false)
}
} else {
logger.Print(MODULE, "waiting for time alignment", color.FgYellow)
logger.Print(MODULE, "waiting for time alignment", color.FgYellow, false)
}
}

func (g *Geophone) OnError(options *feature.FeatureOptions, err error) {
options.Status.System.Errors++
logger.Print(MODULE, err, color.FgRed)
logger.Print(MODULE, err, color.FgRed, false)
}
92 changes: 58 additions & 34 deletions feature/geophone/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,95 @@ package geophone
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/bclswl0827/observer/driver/serial"
"github.com/bclswl0827/observer/feature"
"github.com/bclswl0827/observer/utils/duration"
)

func (g *Geophone) Start(options *feature.FeatureOptions) {
func (g *Geophone) Run(options *feature.FeatureOptions, waitGroup *sync.WaitGroup) {
var (
device = options.Config.Serial.Device
baud = options.Config.Serial.Baud
packetLen = options.Config.Serial.Packet
)

// Increase wait group counter
waitGroup.Add(1)
defer waitGroup.Done()

// Open serial port
port, err := serial.Open(device, baud)
if err != nil {
g.OnError(options, err)
os.Exit(1)
}
defer serial.Close(port)

var packet Packet
g.OnStart(options, "service has started")

if options.Config.Geophone.EHZ.Compensation ||
options.Config.Geophone.EHE.Compensation ||
options.Config.Geophone.EHN.Compensation {
g.OnStart(options, "compensation is in beta")
}
go func() {
// Initialize geophone packet
var packet Packet
g.OnStart(options, "service has started")

lastRead := time.Now().UTC()
for {
err := g.Read(port, options.Config, &packet, packetLen)
if err != nil {
serial.Close(port)
g.OnError(options, err)
// FIXME: Compensation is in beta
if options.Config.Geophone.EHZ.Compensation ||
options.Config.Geophone.EHE.Compensation ||
options.Config.Geophone.EHN.Compensation {
g.OnStart(options, "compensation is in beta")
}

port, err = serial.Open(device, baud)
lastRead := time.Now().UTC()
for {
// Read from serial port by channel packet length
err := g.Read(port, options.Config, &packet, packetLen)
if err != nil {
serial.Close(port)
g.OnError(options, err)
os.Exit(1)

// Reopen serial port
port, err = serial.Open(device, baud)
if err != nil {
g.OnError(options, err)
os.Exit(1)
}

// Reset device after reopen
err = g.Reset(port)
if err != nil {
g.OnError(options, err)
}

lastRead = time.Now().UTC()
continue
} else {
g.OnReady(options, packet)
}

err = g.Reset(port)
if err != nil {
// Reset device if reached TIMEOUT_THRESHOLD
if duration.Difference(time.Now().UTC(), lastRead) >= TIMEOUT_THRESHOLD {
err := fmt.Errorf("reset due to unusual gap")
g.OnError(options, err)

err = g.Reset(port)
if err != nil {
g.OnError(options, err)
}
}

lastRead = time.Now().UTC()
continue
} else {
g.OnReady(options, packet)
}
}()

// Reset device if reached TIMEOUT_THRESHOLD
if duration.Difference(time.Now().UTC(), lastRead) >= TIMEOUT_THRESHOLD {
err := fmt.Errorf("reset due to unusual gap")
g.OnError(options, err)
// Receive interrupt signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)

err = g.Reset(port)
if err != nil {
g.OnError(options, err)
}
}

lastRead = time.Now().UTC()
}
// Wait for interrupt signals
<-sigCh
g.OnStop(options, "closing serial connection")
serial.Close(port)
}
Loading

0 comments on commit 6391a60

Please sign in to comment.