Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit 9231f2c

Browse files
committed
ref(): moved files from internal replication folder
1 parent e2f2740 commit 9231f2c

File tree

9 files changed

+18
-25
lines changed

9 files changed

+18
-25
lines changed

internal/replication/filter.go renamed to filter.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replication
1+
package pglogicalstream
22

33
import (
44
"bytes"
@@ -9,7 +9,6 @@ import (
99
"github.com/apache/arrow/go/v14/arrow/memory"
1010
"github.com/cloudquery/plugin-sdk/v4/scalar"
1111
"github.com/usedatabrew/pglogicalstream/internal/schemas"
12-
"github.com/usedatabrew/pglogicalstream/messages"
1312
"strings"
1413
)
1514

@@ -18,7 +17,7 @@ type ChangeFilter struct {
1817
schemaWhiteList string
1918
}
2019

21-
type Filtered func(change messages.Wal2JsonChanges)
20+
type Filtered func(change Wal2JsonChanges)
2221

2322
func NewChangeFilter(tableSchemas []schemas.DataTableSchema, schema string) ChangeFilter {
2423
tablesMap := map[string]*arrow.Schema{}
@@ -43,9 +42,9 @@ func (c ChangeFilter) FilterChange(lsn string, change []byte, OnFiltered Filtere
4342
}
4443

4544
for _, ch := range changes.Change {
46-
var filteredChanges = messages.Wal2JsonChanges{
45+
var filteredChanges = Wal2JsonChanges{
4746
Lsn: lsn,
48-
Changes: []messages.Wal2JsonChange{},
47+
Changes: []Wal2JsonChange{},
4948
}
5049
if ch.Schema != c.schemaWhiteList {
5150
continue
@@ -84,7 +83,7 @@ func (c ChangeFilter) FilterChange(lsn string, change []byte, OnFiltered Filtere
8483
scalar.AppendToBuilder(builder.Field(i), s)
8584
}
8685

87-
filteredChanges.Changes = append(filteredChanges.Changes, messages.Wal2JsonChange{
86+
filteredChanges.Changes = append(filteredChanges.Changes, Wal2JsonChange{
8887
Kind: ch.Kind,
8988
Schema: ch.Schema,
9089
Table: ch.Table,

internal/replication/README.MD

Lines changed: 0 additions & 5 deletions
This file was deleted.

logical_stream.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/jackc/pgx/v5/pgconn"
1515
"github.com/jackc/pgx/v5/pgproto3"
1616
"github.com/usedatabrew/pglogicalstream/internal/helpers"
17-
"github.com/usedatabrew/pglogicalstream/internal/replication"
1817
"github.com/usedatabrew/pglogicalstream/internal/schemas"
1918
"os"
2019
"strings"
@@ -38,7 +37,7 @@ type Stream struct {
3837
messages chan Wal2JsonChanges
3938
snapshotMessages chan Wal2JsonChanges
4039
snapshotName string
41-
changeFilter replication.ChangeFilter
40+
changeFilter ChangeFilter
4241
lsnrestart pglogrepl.LSN
4342
slotName string
4443
schema string
@@ -124,7 +123,7 @@ func NewPgStream(config Config, logger *log.Logger) (*Stream, error) {
124123
separateChanges: config.SeparateChanges,
125124
snapshotBatchSize: config.BatchSize,
126125
tableNames: tableNames,
127-
changeFilter: replication.NewChangeFilter(dataSchemas, config.DbSchema),
126+
changeFilter: NewChangeFilter(dataSchemas, config.DbSchema),
128127
logger: logger,
129128
}
130129

@@ -164,9 +163,9 @@ func NewPgStream(config Config, logger *log.Logger) (*Stream, error) {
164163
} else {
165164
if len(slotCheckResults) == 0 || len(slotCheckResults[0].Rows) == 0 {
166165
// here we create a new replication slot because there is no slot found
167-
var createSlotResult replication.CreateReplicationSlotResult
168-
createSlotResult, err = replication.CreateReplicationSlot(context.Background(), stream.pgConn, stream.slotName, outputPlugin,
169-
replication.CreateReplicationSlotOptions{Temporary: false,
166+
var createSlotResult CreateReplicationSlotResult
167+
createSlotResult, err = CreateReplicationSlot(context.Background(), stream.pgConn, stream.slotName, outputPlugin,
168+
CreateReplicationSlotOptions{Temporary: false,
170169
SnapshotAction: "export",
171170
})
172171
if err != nil {
@@ -304,7 +303,7 @@ func (s *Stream) streamMessagesAsync() {
304303
}
305304
}
306305
func (s *Stream) processSnapshot() {
307-
snapshotter, err := replication.NewSnapshotter(s.dbConfig, s.snapshotName)
306+
snapshotter, err := NewSnapshotter(s.dbConfig, s.snapshotName)
308307
if err != nil {
309308
s.logger.Errorf("Failed to create database snapshot: %", err.Error())
310309
s.cleanUpOnFailure()
@@ -419,7 +418,7 @@ func (s *Stream) LrMessageC() chan Wal2JsonChanges {
419418
// cleanUpOnFailure drops replication slot and publication if database snapshotting was failed for any reason
420419
func (s *Stream) cleanUpOnFailure() {
421420
s.logger.Warn("Cleaning up resources on accident.", "replication-slot", s.slotName)
422-
err := replication.DropReplicationSlot(context.Background(), s.pgConn, s.slotName, replication.DropReplicationSlotOptions{Wait: true})
421+
err := DropReplicationSlot(context.Background(), s.pgConn, s.slotName, DropReplicationSlotOptions{Wait: true})
423422
if err != nil {
424423
s.logger.Errorf("Failed to drop replication slot: %s", err.Error())
425424
}

internal/replication/message.go renamed to message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replication
1+
package pglogicalstream
22

33
import (
44
"bytes"

internal/replication/message_test.go renamed to message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replication
1+
package pglogicalstream
22

33
import (
44
"encoding/binary"

internal/replication/pglogrepl.go renamed to pglogrepl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replication
1+
package pglogicalstream
22

33
// Package pglogrepl implements PostgreSQL logical replication client functionality.
44
//

internal/replication/pglogrepl_test.go renamed to pglogrepl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replication
1+
package pglogicalstream
22

33
import (
44
"context"

internal/replication/snapshotter.go renamed to snapshotter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replication
1+
package pglogicalstream
22

33
import (
44
"context"

internal/replication/wal_changes_message.go renamed to wal_changes_message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package replication
1+
package pglogicalstream
22

33
type WallMessage struct {
44
Change []struct {

0 commit comments

Comments
 (0)