Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit f1c4a3e

Browse files
committed
chore(): removed arraw schema from project
1 parent f946e87 commit f1c4a3e

File tree

4 files changed

+120
-127
lines changed

4 files changed

+120
-127
lines changed

filter.go

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,22 @@ import (
44
"bytes"
55
"encoding/json"
66
"fmt"
7-
"github.com/apache/arrow/go/v14/arrow"
8-
"github.com/apache/arrow/go/v14/arrow/array"
9-
"github.com/apache/arrow/go/v14/arrow/memory"
10-
"github.com/cloudquery/plugin-sdk/v4/scalar"
11-
"github.com/usedatabrew/pglogicalstream/internal/schemas"
127
"strings"
8+
9+
"github.com/usedatabrew/pglogicalstream/internal/schemas"
1310
)
1411

1512
type ChangeFilter struct {
16-
tablesWhiteList map[string]*arrow.Schema
13+
tablesWhiteList map[string]bool
1714
schemaWhiteList string
1815
}
1916

2017
type Filtered func(change Wal2JsonChanges)
2118

2219
func NewChangeFilter(tableSchemas []schemas.DataTableSchema, schema string) ChangeFilter {
23-
tablesMap := map[string]*arrow.Schema{}
20+
tablesMap := map[string]bool{}
2421
for _, table := range tableSchemas {
25-
tablesMap[strings.Split(table.TableName, ".")[1]] = table.Schema
22+
tablesMap[strings.Split(table.TableName, ".")[1]] = true
2623
}
2724

2825
return ChangeFilter{
@@ -43,51 +40,34 @@ func (c ChangeFilter) FilterChange(lsn string, change []byte, OnFiltered Filtere
4340

4441
for _, ch := range changes.Change {
4542
var filteredChanges = Wal2JsonChanges{
46-
Lsn: lsn,
43+
Lsn: &lsn,
4744
Changes: []Wal2JsonChange{},
4845
}
4946
if ch.Schema != c.schemaWhiteList {
5047
continue
5148
}
5249

5350
var (
54-
arrowTableSchema *arrow.Schema
55-
tableExist bool
51+
tableExist bool
5652
)
5753

58-
if arrowTableSchema, tableExist = c.tablesWhiteList[ch.Table]; !tableExist {
54+
if _, tableExist = c.tablesWhiteList[ch.Table]; !tableExist {
5955
continue
6056
}
6157

62-
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowTableSchema)
63-
changesMap := map[string]interface{}{}
6458
if ch.Kind == "delete" {
6559
for i, changedValue := range ch.Oldkeys.Keyvalues {
66-
changesMap[ch.Oldkeys.Keynames[i]] = changedValue
67-
}
68-
} else {
69-
for i, changedValue := range ch.Columnvalues {
70-
changesMap[ch.Columnnames[i]] = changedValue
60+
ch.Columnvalues[i] = changedValue
7161
}
7262
}
7363

74-
arrowSchema := c.tablesWhiteList[ch.Table]
75-
for i, arrowField := range arrowSchema.Fields() {
76-
fieldName := arrowField.Name
77-
value := changesMap[fieldName]
78-
s := scalar.NewScalar(arrowSchema.Field(i).Type)
79-
if err := s.Set(value); err != nil {
80-
panic(fmt.Errorf("error setting value for column %s: %w", arrowField.Name, err))
81-
}
82-
83-
scalar.AppendToBuilder(builder.Field(i), s)
84-
}
85-
8664
filteredChanges.Changes = append(filteredChanges.Changes, Wal2JsonChange{
87-
Kind: ch.Kind,
88-
Schema: ch.Schema,
89-
Table: ch.Table,
90-
Row: builder.NewRecord(),
65+
Kind: ch.Kind,
66+
Schema: ch.Schema,
67+
Table: ch.Table,
68+
ColumnNames: ch.Columnnames,
69+
ColumnTypes: ch.Columntypes,
70+
ColumnValues: ch.Columnvalues,
9171
})
9272

9373
OnFiltered(filteredChanges)

logical_stream.go

Lines changed: 84 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,14 @@ package pglogicalstream
33
import (
44
"context"
55
"crypto/tls"
6+
"database/sql"
67
"fmt"
78
"os"
89
"strings"
910
"time"
1011

11-
"github.com/apache/arrow/go/v14/arrow"
12-
"github.com/apache/arrow/go/v14/arrow/array"
13-
"github.com/apache/arrow/go/v14/arrow/memory"
1412
"github.com/charmbracelet/log"
15-
"github.com/cloudquery/plugin-sdk/v4/scalar"
1613
"github.com/jackc/pglogrepl"
17-
"github.com/jackc/pgx/v5"
1814
"github.com/jackc/pgx/v5/pgconn"
1915
"github.com/jackc/pgx/v5/pgproto3"
2016
"github.com/usedatabrew/pglogicalstream/internal/helpers"
@@ -40,7 +36,6 @@ type Stream struct {
4036
lsnrestart pglogrepl.LSN
4137
slotName string
4238
schema string
43-
tableSchemas []schemas.DataTableSchema
4439
tableNames []string
4540
separateChanges bool
4641
snapshotBatchSize int
@@ -86,19 +81,6 @@ func NewPgStream(config Config, logger *log.Logger) (*Stream, error) {
8681
var dataSchemas []schemas.DataTableSchema
8782
for _, table := range config.DbTablesSchema {
8883
tableNames = append(tableNames, strings.Split(table.Table, ".")[1])
89-
var dts schemas.DataTableSchema
90-
dts.TableName = table.Table
91-
var arrowSchemaFields []arrow.Field
92-
for _, col := range table.Columns {
93-
arrowSchemaFields = append(arrowSchemaFields, arrow.Field{
94-
Name: col.Name,
95-
Type: helpers.MapPlainTypeToArrow(col.DatabrewType),
96-
Nullable: col.Nullable,
97-
Metadata: arrow.Metadata{},
98-
})
99-
}
100-
dts.Schema = arrow.NewSchema(arrowSchemaFields, nil)
101-
dataSchemas = append(dataSchemas, dts)
10284
}
10385

10486
stream := &Stream{
@@ -108,7 +90,6 @@ func NewPgStream(config Config, logger *log.Logger) (*Stream, error) {
10890
snapshotMessages: make(chan Wal2JsonChanges, 100),
10991
slotName: config.ReplicationSlotName,
11092
schema: config.DbSchema,
111-
tableSchemas: dataSchemas,
11293
snapshotMemorySafetyFactor: config.SnapshotMemorySafetyFactor,
11394
separateChanges: config.SeparateChanges,
11495
snapshotBatchSize: config.BatchSize,
@@ -154,7 +135,7 @@ func NewPgStream(config Config, logger *log.Logger) (*Stream, error) {
154135
if len(slotCheckResults) == 0 || len(slotCheckResults[0].Rows) == 0 {
155136
// here we create a new replication slot because there is no slot found
156137
var createSlotResult CreateReplicationSlotResult
157-
createSlotResult, err = CreateReplicationSlot(context.Background(), stream.pgConn, stream.slotName, outputPlugin,
138+
createSlotResult, err = CreateReplicationSlot(context.Background(), stream.pgConn, stream.slotName, "wal2json",
158139
CreateReplicationSlotOptions{Temporary: false,
159140
SnapshotAction: "export",
160141
})
@@ -309,71 +290,111 @@ func (s *Stream) processSnapshot() {
309290
snapshotter.CloseConn()
310291
}()
311292

312-
for _, table := range s.tableSchemas {
313-
s.logger.Info("Processing database snapshot", "schema", s.schema, "table", table)
314-
315-
var offset = 0
293+
for _, table := range s.tableNames {
294+
log.Printf("Processing snapshot for a table %s.%s", s.schema, table)
316295

317-
pk, err := s.getPrimaryKeyColumn(table.TableName)
318-
if err != nil {
319-
s.logger.Fatalf("Failed to resolve pk %s", err.Error())
320-
}
296+
var (
297+
avgRowSizeBytes sql.NullInt64
298+
offset = int(0)
299+
)
300+
// extract only the name of the table
301+
rawTableName := strings.Split(table, ".")[1]
302+
avgRowSizeBytes = snapshotter.FindAvgRowSize(rawTableName)
303+
fmt.Println(avgRowSizeBytes, offset, "AVG SIZES")
321304

322-
s.logger.Info("Query snapshot", "batch-size", s.snapshotBatchSize)
323-
builder := array.NewRecordBuilder(memory.DefaultAllocator, table.Schema)
305+
batchSize := snapshotter.CalculateBatchSize(helpers.GetAvailableMemory(), uint64(avgRowSizeBytes.Int64))
306+
fmt.Println("Query with batch size", batchSize, "Available memory: ", helpers.GetAvailableMemory(), "Avg row size: ", avgRowSizeBytes.Int64)
324307

325-
colNames := make([]string, 0, len(table.Schema.Fields()))
308+
for {
309+
var snapshotRows *sql.Rows
310+
if snapshotRows, err = snapshotter.QuerySnapshotData(table, batchSize, offset); err != nil {
311+
log.Fatalf("Can't query snapshot data %v", err)
312+
}
326313

327-
for _, col := range table.Schema.Fields() {
328-
colNames = append(colNames, pgx.Identifier{col.Name}.Sanitize())
329-
}
314+
columnTypes, err := snapshotRows.ColumnTypes()
315+
var columnTypesString = make([]string, len(columnTypes))
316+
columnNames, err := snapshotRows.Columns()
317+
for i, _ := range columnNames {
318+
columnTypesString[i] = columnTypes[i].DatabaseTypeName()
319+
}
330320

331-
for {
332-
var snapshotRows pgx.Rows
333-
s.logger.Info("Query snapshot: ", "table", table.TableName, "columns", colNames, "batch-size", s.snapshotBatchSize, "offset", offset)
334-
if snapshotRows, err = snapshotter.QuerySnapshotData(table.TableName, colNames, pk, s.snapshotBatchSize, offset); err != nil {
335-
s.logger.Errorf("Failed to query snapshot data %s", err.Error())
336-
s.cleanUpOnFailure()
337-
os.Exit(1)
321+
if err != nil {
322+
panic(err)
338323
}
339324

325+
count := len(columnTypes)
340326
var rowsCount = 0
341327
for snapshotRows.Next() {
342328
rowsCount += 1
329+
scanArgs := make([]interface{}, count)
330+
for i, v := range columnTypes {
331+
switch v.DatabaseTypeName() {
332+
case "VARCHAR", "TEXT", "UUID", "TIMESTAMP":
333+
scanArgs[i] = new(sql.NullString)
334+
break
335+
case "BOOL":
336+
scanArgs[i] = new(sql.NullBool)
337+
break
338+
case "INT4":
339+
scanArgs[i] = new(sql.NullInt64)
340+
break
341+
default:
342+
scanArgs[i] = new(sql.NullString)
343+
}
344+
}
345+
346+
err := snapshotRows.Scan(scanArgs...)
343347

344-
values, err := snapshotRows.Values()
345348
if err != nil {
346349
panic(err)
347350
}
348351

349-
for i, v := range values {
350-
s := scalar.NewScalar(table.Schema.Field(i).Type)
351-
if err := s.Set(v); err != nil {
352-
panic(err)
352+
var columnValues = make([]interface{}, len(columnTypes))
353+
for i, _ := range columnTypes {
354+
if z, ok := (scanArgs[i]).(*sql.NullBool); ok {
355+
columnValues[i] = z.Bool
356+
continue
357+
}
358+
if z, ok := (scanArgs[i]).(*sql.NullString); ok {
359+
columnValues[i] = z.String
360+
continue
361+
}
362+
if z, ok := (scanArgs[i]).(*sql.NullInt64); ok {
363+
columnValues[i] = z.Int64
364+
continue
365+
}
366+
if z, ok := (scanArgs[i]).(*sql.NullFloat64); ok {
367+
columnValues[i] = z.Float64
368+
continue
369+
}
370+
if z, ok := (scanArgs[i]).(*sql.NullInt32); ok {
371+
columnValues[i] = z.Int32
372+
continue
353373
}
354374

355-
scalar.AppendToBuilder(builder.Field(i), s)
375+
columnValues[i] = scanArgs[i]
356376
}
357-
var snapshotChanges = Wal2JsonChanges{
358-
Lsn: "",
359-
Changes: []Wal2JsonChange{
360-
{
361-
Kind: "insert",
362-
Schema: s.schema,
363-
Table: strings.Split(table.TableName, ".")[1],
364-
Row: builder.NewRecord(),
365-
},
366-
},
377+
378+
var snapshotChanges []Wal2JsonChange
379+
snapshotChanges = append(snapshotChanges, Wal2JsonChange{
380+
Kind: "insert",
381+
Schema: s.schema,
382+
Table: table,
383+
ColumnNames: columnNames,
384+
ColumnValues: columnValues,
385+
})
386+
var lsn *string
387+
snapshotChangePacket := Wal2JsonChanges{
388+
Lsn: lsn,
389+
Changes: snapshotChanges,
367390
}
368391

369-
s.snapshotMessages <- snapshotChanges
392+
s.snapshotMessages <- snapshotChangePacket
370393
}
371394

372-
snapshotRows.Close()
373-
374-
offset += s.snapshotBatchSize
395+
offset += batchSize
375396

376-
if s.snapshotBatchSize != rowsCount {
397+
if batchSize != rowsCount {
377398
break
378399
}
379400
}

0 commit comments

Comments
 (0)