Skip to content

Commit

Permalink
Merge pull request #31 from bgokden/refactor
Browse files Browse the repository at this point in the history
Stable release
  • Loading branch information
bgokden authored Mar 9, 2021
2 parents b56ce5b + 53d3ee9 commit 8c91ddc
Show file tree
Hide file tree
Showing 16 changed files with 511 additions and 122 deletions.
19 changes: 15 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
# build stage
#FROM golang:1.15.6-alpine AS build-env
FROM golang:1.16.0-alpine AS build-env
RUN apk add --no-cache git
#FROM golang:1.16.0-alpine3.12 AS build-env
FROM golang:1.16.0-buster AS build-env
RUN apt-get update && apt-get install -y git bash curl build-essential

ENV USER_ID 0

COPY jemalloc.sh .
RUN bash jemalloc.sh

WORKDIR /src/veri
COPY . /src/veri

RUN go mod tidy
RUN go mod download
RUN go mod verify
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags='-w -s -extldflags "-static"' -a -o veri
RUN GOOS=linux GOARCH=amd64 go build -ldflags='-w -s -extldflags "-static"' -tags=jemalloc -a -o veri

# final stage
FROM gcr.io/distroless/static@sha256:c6d5981545ce1406d33e61434c61e9452dad93ecd8397c41e89036ef977a88f4
# FROM gcr.io/distroless/static@sha256:c6d5981545ce1406d33e61434c61e9452dad93ecd8397c41e89036ef977a88f4
# FROM gcr.io/distroless/base-debian10
FROM debian:buster-slim
RUN apt-get update && apt-get install -y libjemalloc-dev
WORKDIR /app
COPY --from=build-env /src/veri/veri /app/
ENTRYPOINT ["/app/veri"]
Expand Down
7 changes: 3 additions & 4 deletions data/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package data

import (
"encoding/hex"
"log"
"sort"
"time"

Expand Down Expand Up @@ -33,15 +32,15 @@ func NewAggrator(config *pb.SearchConfig, grouped bool, context *pb.SearchContex
Context: context,
ScoreFunc: GetVectorComparisonFunction(config.ScoreFuncName),
}
a.DeDuplicationMap = cache.New(5*time.Minute, 10*time.Minute)
a.DeDuplicationMap = cache.New(5*time.Minute, 1*time.Minute)
return a
}

func (a *Aggregator) IsNewScoredBetter(old, new float64) bool {
if old == new {
/* if old == new {
// This is a possible edge case.
log.Printf("Same Score: %v\n", old)
}
} */
if a.Config.HigherIsBetter {
if new > old {
return true
Expand Down
18 changes: 10 additions & 8 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewData(config *pb.DataConfig, dataPath string) (*Data, error) {
dt := &Data{
Config: config,
}
log.Printf("Create Data\n")
// log.Printf("Create Data\n")
dt.DBPath = path.Join(dataPath, config.Name)
dt.InitData()
return dt, nil
Expand All @@ -58,7 +58,7 @@ func NewPreData(config *pb.DataConfig, dataPath string) *Data {
dt := &Data{
Config: config,
}
log.Printf("Pre Create Data %v\n", dt.Config)
// log.Printf("Pre Create Data %v\n", dt.Config)
dt.DBPath = path.Join(dataPath, config.Name)
return dt
}
Expand Down Expand Up @@ -131,8 +131,9 @@ func (dt *Data) Run() error {

// Process runs through keys and calculates statistics
func (dt *Data) Process(force bool) error {
if dt.Dirty || getCurrentTime()-dt.Timestamp >= 10000 || force {
log.Printf("Running Process (forced: %v)\n", force)
// log.Printf("Try Running Process (forced: %v) current: %v timestamp: %v diff: %v\n", force, getCurrentTime(), dt.Timestamp, getCurrentTime()-dt.Timestamp)
if getCurrentTime()-dt.Timestamp >= 60 || force {
// log.Printf("Running Process (forced: %v)\n", force)
n := uint64(0)
distance := 0.0
maxDistance := 0.0
Expand Down Expand Up @@ -181,15 +182,16 @@ func (dt *Data) Process(force bool) error {
dt.MaxDistance = maxDistance
dt.N = n
dt.Timestamp = getCurrentTime()
dt.SyncAll()
}
dt.Timestamp = getCurrentTime() // update always
// dt.Timestamp = getCurrentTime() // update always
dt.Dirty = false
return nil
}

// GetDataInfo out of data
func (dt *Data) GetDataInfo() *pb.DataInfo {
log.Printf("Data: %v\n", dt)
// log.Printf("Data: %v\n", dt)
return &pb.DataInfo{
Avg: dt.Avg,
N: dt.N,
Expand All @@ -205,11 +207,11 @@ func (dt *Data) GetDataInfo() *pb.DataInfo {
}

// AddSource adds a source
func (dt *Data) AddSource(dataSource DataSource) {
func (dt *Data) AddSource(dataSource DataSource) error {
if dt.Sources == nil {
dt.InitData()
}
dt.Sources.Set(dataSource.GetID(), dataSource, cache.DefaultExpiration)
return dt.Sources.Add(dataSource.GetID(), dataSource, cache.DefaultExpiration)
}

func (dt *Data) GetID() string {
Expand Down
18 changes: 14 additions & 4 deletions data/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,26 @@ func GetDefaultConfig(name string) *pb.DataConfig {
TargetN: 1000,
TargetUtilization: 0.4,
NoTarget: false,
ReplicationOnInsert: 2,
ReplicationOnInsert: 1,
EnforceReplicationOnInsert: true,
Retention: 0,
}
}

func GetRetention(retention uint64) time.Duration {
if retention == 0 {
return time.Duration(14*24) * time.Hour
}
return time.Duration(retention) * time.Second
}

func (dts *Dataset) Get(name string) (*Data, error) {
item, ok := dts.DataList.Get(name)
if !ok {
return dts.GetOrCreateIfNotExists(GetDefaultConfig(name))
}
if data, ok := item.(*Data); ok {
dts.DataList.IncrementExpiration(name, time.Duration(data.GetConfig().Retention)*time.Second)
dts.DataList.IncrementExpiration(name, GetRetention(data.GetConfig().Retention))
return data, nil
}
return nil, errors.Errorf("Data %v is currupt", name)
Expand Down Expand Up @@ -99,17 +106,20 @@ func (dts *Dataset) GetOrCreateIfNotExists(config *pb.DataConfig) (*Data, error)

func (dts *Dataset) CreateIfNotExists(config *pb.DataConfig) error {
preData := NewPreData(config, dts.DataPath)
retention := time.Duration(config.Retention) * time.Second
log.Printf("Data %v Retention: %v\n", config.Name, retention)
retention := GetRetention(config.Retention)
// log.Printf("Data %v Retention: %v Version: %v\n", config.Name, retention, config.Version)
err := dts.DataList.Add(config.Name, preData, retention)
if err == nil {
go dts.SaveIndex()
return preData.InitData()
}
// log.Printf("Data %v Error: %v\n", config.Name, err.Error())
if err.Error() == fmt.Sprintf("Item %s already exists", config.Name) {
// log.Printf("Data %v Config.Version: %v\n", config.Name, config.Version)
data, err := dts.GetNoOp(config.Name)
if err != nil {
if config.Version > data.Config.Version {
log.Printf("Update Data %v Config.Version: %v\n", config.Name, config.Version)
data.Config = config
}
}
Expand Down
24 changes: 21 additions & 3 deletions data/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package data

import (
"errors"
"log"
"time"

pb "github.com/bgokden/veri/veriservice"
Expand All @@ -15,6 +16,7 @@ func (dt *Data) Insert(datum *pb.Datum, config *pb.InsertConfig) error {
}
var ttlDuration *time.Duration
if config != nil && config.GetTTL() > 0 {
// log.Printf("Insert Datum with ttl config: %v\n", config.GetTTL())
d := time.Duration(config.GetTTL()) * time.Second
ttlDuration = &d
}
Expand All @@ -28,7 +30,7 @@ func (dt *Data) Insert(datum *pb.Datum, config *pb.InsertConfig) error {
}
err = dt.DB.Update(func(txn *badger.Txn) error {
if ttlDuration != nil {
// log.Printf("Insert Datum: %v ttl: %v\n", datum, ttlDuration)
// log.Printf("Insert Datum with ttl: %v\n", ttlDuration)
e := badger.NewEntry(keyByte, valueByte).WithTTL(*ttlDuration)
return txn.SetEntry(e)
}
Expand All @@ -39,15 +41,31 @@ func (dt *Data) Insert(datum *pb.Datum, config *pb.InsertConfig) error {
return err
}
dt.Dirty = true
if dt.Config.EnforceReplicationOnInsert && config.Count >= uint64(dt.Config.ReplicationOnInsert) {
if config == nil {
config = &pb.InsertConfig{
TTL: 0,
Count: 0,
}
}
counter := uint32(1)
if dt.Config.EnforceReplicationOnInsert && config.Count == 0 {
sourceList := dt.Sources.Items()
config.Count++
// log.Printf("Sending Insert with config.Count: %v ttl: %v\n", config.Count, config.TTL)
for _, sourceItem := range sourceList {
source := sourceItem.Object.(DataSource)
err := source.Insert(datum, config)
if err != nil {
return err
log.Printf("Sending Insert error %v\n", err.Error())
} else {
counter++
}
if counter >= dt.Config.ReplicationOnInsert {
break
}
}
if counter < dt.Config.ReplicationOnInsert {
return errors.New("Replicas is less then Replication Config")
}
}
return nil
Expand Down
24 changes: 14 additions & 10 deletions data/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ func DefaultSearchConfig() *pb.SearchConfig {
}
}

func EncodeSearchConfig(p *pb.SearchConfig) []byte {
marshalled, _ := json.Marshal(p)
func EncodeSearchConfig(sc *pb.SearchConfig) []byte {
var config pb.SearchConfig
copier.Copy(&config, sc)
config.Uuid = ""
marshalled, _ := json.Marshal(&config)
log.Printf("SearchConfig Encoded: %v\n", string(marshalled))
return marshalled
}
Expand Down Expand Up @@ -245,7 +248,7 @@ func (dt *Data) Search(datum *pb.Datum, config *pb.SearchConfig) *Collector {
// db.NewStreamAt(readTs) for managed mode.

// -- Optional settings
stream.NumGo = 4 // Set number of goroutines to use for iteration.
stream.NumGo = 16 // Set number of goroutines to use for iteration.
stream.Prefix = nil // Leave nil for iteration over the whole DB.
stream.LogPrefix = "Badger.Streaming" // For identifying stream logs. Outputs to Logger.

Expand Down Expand Up @@ -329,6 +332,7 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<-
for _, sourceItem := range sourceList {
source := sourceItem.Object.(DataSource)
queryWaitGroup.Add(1)
// log.Printf("Search Source %v\n", source.GetID())
go source.StreamSearch(datum, scoredDatumStream, &queryWaitGroup, config)
}
go func() {
Expand All @@ -343,20 +347,20 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<-
case scoredDatum := <-scoredDatumStream:
temp.Insert(scoredDatum)
case <-waitChannel:
log.Printf("AggregatedSearch: all data finished")
// log.Printf("AggregatedSearch: all data finished")
close(scoredDatumStream)
for scoredDatum := range scoredDatumStream {
temp.Insert(scoredDatum)
}
dataAvailable = false
break
case <-timeLimit:
log.Printf("timeout")
// log.Printf("timeout")
dataAvailable = false
break
}
}
log.Printf("search collected data\n")
// log.Printf("search collected data\n")
// Search End
result := temp.Result()
resultCopy := CloneResult(result)
Expand All @@ -369,7 +373,7 @@ func (dt *Data) AggregatedSearch(datum *pb.Datum, scoredDatumStreamOutput chan<-
if config.CacheDuration > 0 {
cacheDuration := time.Duration(config.CacheDuration) * time.Second
dt.QueryCache.Set(queryKey, resultCopy, cacheDuration)
log.Printf("AggregatedSearch: finished. Set Cache Duration: %v\n", cacheDuration)
// log.Printf("AggregatedSearch: finished. Set Cache Duration: %v\n", cacheDuration)
}
return nil
}
Expand Down Expand Up @@ -413,20 +417,20 @@ func (dt *Data) MultiAggregatedSearch(datumList []*pb.Datum, config *pb.SearchCo
case scoredDatum := <-scoredDatumStream:
temp.Insert(scoredDatum)
case <-waitChannel:
log.Printf("MultiAggregatedSearch: all data finished")
// log.Printf("MultiAggregatedSearch: all data finished")
close(scoredDatumStream)
for scoredDatum := range scoredDatumStream {
temp.Insert(scoredDatum)
}
dataAvailable = false
break
case <-timeLimit:
log.Printf("timeout")
// log.Printf("timeout")
dataAvailable = false
break
}
}
// Search End
log.Printf("MultiAggregatedSearch: finished")
// log.Printf("MultiAggregatedSearch: finished")
return temp.Result(), nil
}
Loading

0 comments on commit 8c91ddc

Please sign in to comment.