Skip to content

Commit

Permalink
Merge pull request #12 from redis-performance/geopolygon.ingest
Browse files Browse the repository at this point in the history
Initial support for polygon ingestion on redis
  • Loading branch information
filipecosta90 authored May 15, 2023
2 parents ffcbed4 + 412cdf5 commit 90e27fc
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 11 deletions.
18 changes: 17 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@


![logo](./logo.png)

This repository contains a set of scripts and tools for running benchmarks on vanilla Redis GEO commands and RediSearch, a full-text search engine for Redis.

Expand Down Expand Up @@ -44,7 +44,10 @@ make
```



## Try it out

### GeoPoints
```bash
# get dataset
wget https://s3.us-east-2.amazonaws.com/redis.benchmarks.spec/datasets/geopoint/documents.json.bz2
Expand All @@ -56,3 +59,16 @@ wget -c https://github.com/redis-performance/geo-bench/releases/latest/download/
# load data
./geo-bench load
```

### GeoPolygons
```bash
# get dataset
wget https://s3.us-east-2.amazonaws.com/redis.benchmarks.spec/datasets/geoshape/polygons.json.bz2
bzip2 -d polygons.json.bz2

# get tool
wget -c https://github.com/redis-performance/geo-bench/releases/latest/download/geo-bench-$(uname -mrs | awk '{ print tolower($1) }')-$(dpkg --print-architecture).tar.gz -O - | tar -xz

# load data
./geo-bench load --input-type geoshape --input polygons.json
```
12 changes: 12 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const REDIS_TYPE_GEO = "redis-geo"
const REDIS_TYPE_GENERIC = "redis"
const REDIS_TYPE_JSON = "redisearch-json"
const REDIS_TYPE_HASH = "redisearch-hash"
const INPUT_TYPE_GEOPOINT = "geopoint"
const INPUT_TYPE_GEOSHAPE = "geoshape"
const REDIS_DEFAULT_IDX_NAME = "idx"
const REDIS_GEO_DEFAULT_KEYNAME = "key"
const REDIS_IDX_NAME_PROPERTY = "redisearch.index.name"
Expand All @@ -24,10 +26,20 @@ type GeoPoint struct {
LatLon []float64 `json:"location"`
}

type GeoShape struct {
Shape string `json:"shape"`
}

func lineToLonLat(line string) (float64, float64) {
var geo GeoPoint
json.Unmarshal([]byte(line), &geo)
lon := geo.LatLon[0]
lat := geo.LatLon[1]
return lon, lat
}

func lineToPolygon(line string) string {
var geo GeoShape
json.Unmarshal([]byte(line), &geo)
return geo.Shape
}
105 changes: 97 additions & 8 deletions cmd/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"context"
"fmt"
hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
"github.com/rueian/rueidis"
"github.com/redis/rueidis"
"github.com/spf13/cobra"
"io"
"log"
Expand All @@ -34,6 +34,7 @@ var loadCmd = &cobra.Command{
indexSearchName, _ := cmd.Flags().GetString(REDIS_IDX_NAME_PROPERTY)
db, _ := cmd.Flags().GetString("db")
input, _ := cmd.Flags().GetString("input")
inputType, _ := cmd.Flags().GetString("input-type")
uri, _ := cmd.Flags().GetString("uri")
concurrency, _ := cmd.Flags().GetInt("concurrency")
requests, _ := cmd.Flags().GetInt("requests")
Expand Down Expand Up @@ -69,9 +70,13 @@ var loadCmd = &cobra.Command{
defer file.Close()

scanner := bufio.NewScanner(file)
buf := make([]byte, 512*1024*1024)
scanner.Buffer(buf, 512*1024*1024)

n := 0
for scanner.Scan() {
workQueue <- scanner.Text()
finalInputLine := scanner.Text()
workQueue <- finalInputLine
n = n + 1
if n >= nDatapoints {
break
Expand All @@ -82,8 +87,14 @@ var loadCmd = &cobra.Command{
close(workQueue)
}()

var geopoints uint64
setupStage(uri, db, indexSearch, indexSearchName)
var geoCommands uint64
if strings.Compare(inputType, "geoshape") == 0 {
setupStageGeoShape(uri, db, indexSearch, indexSearchName)
// geopoint
} else {
setupStageGeoPoint(uri, db, indexSearch, indexSearchName)
}

// listen for C-c
controlC := make(chan os.Signal, 1)
signal.Notify(controlC, os.Interrupt)
Expand All @@ -92,7 +103,14 @@ var loadCmd = &cobra.Command{
start := time.Now()
// Now read them all off, concurrently.
for i := 0; i < concurrency; i++ {
go loadWorker(uri, workQueue, complete, &geopoints, datapointsChan, uint64(nDatapoints), db, redisGeoKeyname)
// geoshape
if strings.Compare(inputType, "geoshape") == 0 {
go loadWorkerGeoshape(uri, workQueue, complete, &geoCommands, datapointsChan, uint64(nDatapoints), db, redisGeoKeyname)
// geopoint
} else {
go loadWorkerGeopoint(uri, workQueue, complete, &geoCommands, datapointsChan, uint64(nDatapoints), db, redisGeoKeyname)
}

// delay the creation 1ms for each additional client
time.Sleep(time.Millisecond * 1)
}
Expand All @@ -114,7 +132,7 @@ var loadCmd = &cobra.Command{
fmt.Printf("Latency summary (msec):\n")
fmt.Printf(" %9s %9s %9s %9s\n", "avg", "p50", "p95", "p99")
fmt.Printf(" %9.3f %9.3f %9.3f %9.3f\n", avgMs, p50IngestionMs, p95IngestionMs, p99IngestionMs)
fmt.Println(fmt.Sprintf("Finished inserting %d geo points", geopoints))
fmt.Println(fmt.Sprintf("Finished inserting %d geo points", geoCommands))
},
}

Expand All @@ -133,7 +151,38 @@ func validateDB(db string) {
}
}

func setupStage(uri, db string, indexSearch bool, indexName string) {
func setupStageGeoShape(uri, db string, indexSearch bool, indexName string) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
})
defer c.Close()
ctx := context.Background()
log.Printf("Starting setup stage for %s DB. Sending setup commands...\n", db)
switch db {
case REDIS_TYPE_JSON:
if indexSearch {
log.Printf("Creating redisearch index named %s.\n", indexName)
err = c.Do(ctx, c.B().FtCreate().Index(indexName).OnJson().Schema().FieldName("$.shape").As("shape").Geometry().Build()).Error()
} else {
log.Printf("Skipping the creation of redisearch index %s.\n", indexName)
}
case REDIS_TYPE_HASH:
if indexSearch {
log.Printf("Creating redisearch index named %s.\n", indexName)
err = c.Do(ctx, c.B().FtCreate().Index(indexName).OnHash().Schema().FieldName("shape").Geometry().Build()).Error()
} else {
log.Printf("Skipping the creation of redisearch index %s.\n", indexName)
}
default:
log.Fatal(fmt.Sprintf("DB was not recognized. Exiting..."))
}
if err != nil {
log.Fatal(fmt.Sprintf("Received error on setup stage: '%s'. Exiting...", err.Error()))
}
log.Printf("Finished setup stage for %s DB\n", db)
}

func setupStageGeoPoint(uri, db string, indexSearch bool, indexName string) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
})
Expand Down Expand Up @@ -244,6 +293,7 @@ func init() {
rootCmd.AddCommand(loadCmd)
loadCmd.Flags().StringP("db", "", REDIS_TYPE_GEO, fmt.Sprintf("Database to load the data to. One of %s", strings.Join([]string{REDIS_TYPE_GEO, REDIS_TYPE_JSON, REDIS_TYPE_HASH}, ",")))
loadCmd.Flags().StringP("input", "i", "documents.json", "Input json file")
loadCmd.Flags().StringP("input-type", "", "geopoint", "Input type. One of 'geopoint' or 'geoshape")
loadCmd.Flags().IntP("concurrency", "c", 50, "Concurrency")
loadCmd.Flags().IntP("requests", "n", -1, "Requests. If -1 then it will use all input datapoints")
loadCmd.Flags().StringP("uri", "u", "localhost:6379", "Server URI")
Expand Down Expand Up @@ -283,7 +333,7 @@ func LineCounter(r io.Reader) (int, error) {
return count, nil
}

func loadWorker(uri string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, redisGeoKeyname string) {
func loadWorkerGeopoint(uri string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, redisGeoKeyname string) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
})
Expand Down Expand Up @@ -324,3 +374,42 @@ func loadWorker(uri string, queue chan string, complete chan bool, ops *uint64,
// Let the main process know we're done.
complete <- true
}

func loadWorkerGeoshape(uri string, queue chan string, complete chan bool, ops *uint64, datapointsChan chan datapoint, totalDatapoints uint64, db string, redisGeoKeyname string) {
c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{uri},
})
if err != nil {
panic(err)
}
defer c.Close()
ctx := context.Background()
for line := range queue {
polygon := lineToPolygon(line)
previousOpsVal := atomic.LoadUint64(ops)
if previousOpsVal >= totalDatapoints {
break
}
atomic.AddUint64(ops, 1)

opsVal := atomic.LoadUint64(ops)
memberS := fmt.Sprintf("%d", opsVal)
startT := time.Now()
switch db {
case REDIS_TYPE_JSON:
err = c.Do(ctx, c.B().JsonSet().Key(memberS).Path("$").Value(fmt.Sprintf("{\"shape\":\"%s\"}", polygon)).Build()).Error()
case REDIS_TYPE_HASH:
fallthrough
default:
err = c.Do(ctx, c.B().Hset().Key(memberS).FieldValue().FieldValue("shape", polygon).Build()).Error()

}
endT := time.Now()

duration := endT.Sub(startT)
datapointsChan <- datapoint{!(err != nil), duration.Microseconds(), 0}

}
// Let the main process know we're done.
complete <- true
}
2 changes: 1 addition & 1 deletion cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"fmt"
hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
"github.com/rueian/rueidis"
"github.com/redis/rueidis"
"github.com/spf13/cobra"
"log"
"math"
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ go 1.19

require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2
github.com/rueian/rueidis v0.0.89
github.com/redis/rueidis v0.0.89
github.com/rueian/rueidis v0.0.100
github.com/spf13/cobra v1.6.1
)

require (
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

replace github.com/redis/rueidis => github.com/filipecosta90/rueidis v0.0.0-20230514183601-25fb0c71c8a5
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/filipecosta90/rueidis v0.0.0-20230514183601-25fb0c71c8a5 h1:WFw8XV5/hvcix6ej8FsG7TWbFPCHKf/KOUt7nxGIh0s=
github.com/filipecosta90/rueidis v0.0.0-20230514183601-25fb0c71c8a5/go.mod h1:yxbpgX+VYNxCvdE0KEQXDeUFcF2hB2Oz/TJiaqFxoEU=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
Expand All @@ -24,6 +26,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rueian/rueidis v0.0.89 h1:Q2TbuNXMJ2d2NegQ47uOoGOGPZLQwRuL0oX/dAlCh6k=
github.com/rueian/rueidis v0.0.89/go.mod h1:LiKWMM/QnILwRfDZIhSIXi4vQqZ/UZy4+/aNkSCt8XA=
github.com/rueian/rueidis v0.0.100 h1:22yp/+8YHuWc/vcrp8bkjeE7baD3vygoh2gZ2+xu1KQ=
github.com/rueian/rueidis v0.0.100/go.mod h1:ivvsRYRtAUcf9OnheuKc5Gpa8IebrkLT1P45Lr2jlXE=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY=
Expand Down
Binary file added logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 90e27fc

Please sign in to comment.