Skip to content
This repository was archived by the owner on Dec 11, 2024. It is now read-only.

Feature/postgres sink #3

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN apt-get update && apt-get install -y \
RUN go get gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
RUN go mod tidy

RUN cd cmd && go build astro/cmd
RUN cd cmd && go build blink/cmd

ENV GOMAXPROCS=2
CMD ["./cmd/cmd"]
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# <p align="center"> DataBrew - Astro ETL </p>
# <p align="center"> DataBrew - Blink </p>
### <p align="center"> OpenSource data processing engine to build event-driven systems </p>
<p align="center">
<img src="./images/preview.png" width="150px" alt="Project social preview">
</p>


Asto ETL gives you the ability to create truly flexible data pipelines to implement even-driven architecture, database replication and data lakes
Blink ETL gives you the ability to create truly flexible data pipelines to implement even-driven architecture, database replication and data lakes

## Getting started
Before we start you have to install Astro on your local machine
Before we start you have to install Blink on your local machine

```shell
brew install @databrew/astro
brew install @databrew/blink
```
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package main

import (
"astro/public/server"
"astro/public/stream"
"blink/public/server"
"blink/public/stream"
"os"
)

Expand Down
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package config

import (
"astro/internal/processors"
"astro/internal/schema"
"astro/internal/sinks"
"astro/internal/sources"
"blink/internal/processors"
"blink/internal/schema"
"blink/internal/sinks"
"blink/internal/sources"
)

type Configuration struct {
Expand Down
81 changes: 37 additions & 44 deletions config/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,57 @@ service:
host: http://137.135.132.105:2379
reload_on_restart: false
stream_schema:
- stream: flights
- stream: taxi_rides
columns:
- name: flight_id
- name: log_id
databrewType: Int32
nativeConnectorType: integer
pk: true
nullable: false
- name: flight_number
databrewType: Int32
nativeConnectorType: integer
pk: false
nullable: true
- name: departure_airport
- name: timestamp
databrewType: String
nativeConnectorType: character varying
pk: false
nullable: true
- name: arrival_airport
databrewType: String
nativeConnectorType: character varying
nativeConnectorType: varchar
pk: false
nullable: true
- name: departure_date
databrewType: Date32
nativeConnectorType: date
- name: driver_id
databrewType: Int32
nativeConnectorType: numeric
pk: false
nullable: true
- name: arrival_date
databrewType: Date32
nativeConnectorType: date
nullable: false
- name: passenger_id
databrewType: Int32
nativeConnectorType: numeric
pk: false
nullable: true
- name: departure_time
nullable: false
- name: start_location
databrewType: String
nativeConnectorType: character varying
nativeConnectorType: varchar
pk: false
nullable: true
- name: arrival_time
nullable: false
- name: end_location
databrewType: String
nativeConnectorType: character varying
nativeConnectorType: varchar
pk: false
nullable: true
- name: flight_duration
nullable: false
- name: fare_amount
databrewType: Float64
nativeConnectorType: double precision
nativeConnectorType: numeric
pk: false
nullable: true
- name: flight_status
nullable: false
- name: payment_method
databrewType: String
nativeConnectorType: character varying
pk: false
nullable: true
- name: id
nullable: false
- name: distance_traveled
databrewType: Float64
nativeConnectorType: numeric
pk: false
nullable: false
- name: duration
databrewType: Int32
nativeConnectorType: integer
pk: true
pk: false
nullable: false

source:
Expand Down Expand Up @@ -91,13 +86,11 @@ processors:
stream_name: "flights"

sink:
driver: stdout
driver: postgres
config:
brokers:
- "pkc-ygz0p.norwayeast.azure.confluent.cloud:9092"
sasl: true
sasl_user: 2BUOXTEG5AKL4IEC
sasl_password: stm1jZufR0njS4hVHnYgX4zrziADac/BZUGv2qh7Z0RBU3alrNTbxdMDob0p0aLg
sasl_mechanism: PLAIN
bind_topic_to_stream: true
topic_name: llsobgnp-mangust
host: databrew-testing-instance.postgres.database.azure.com
user: postgres
password: Lorem123
port: 5432
schema: public
database: mocks_manual_target
2 changes: 1 addition & 1 deletion fly.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# fly.toml app configuration file generated for databrew-astro on 2023-11-12T01:12:09+01:00
# fly.toml app configuration file generated for databrew-blink on 2023-11-12T01:12:09+01:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module astro
module blink

go 1.21.1

Expand All @@ -21,7 +21,7 @@ require (
github.com/mariomac/gostream v0.8.1
github.com/prometheus/client_golang v1.11.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/usedatabrew/pglogicalstream v0.0.18
github.com/usedatabrew/pglogicalstream v0.0.20
go.etcd.io/etcd/client/v3 v3.5.10
go.mongodb.org/mongo-driver v1.13.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/usedatabrew/pglogicalstream v0.0.18 h1:0FZYG2il+ARUQggRBM+3v9LEdTIuzmAIhsKNVuoqWKo=
github.com/usedatabrew/pglogicalstream v0.0.18/go.mod h1:VmPhp8W+MSHR2sIPdwQxGBzMVR2zTzUxEbiMrjH+5eU=
github.com/usedatabrew/pglogicalstream v0.0.20 h1:QfdSO9X8p9sqDoh/LUbzrGtoW5E5QSKL7YSt8rNQtso=
github.com/usedatabrew/pglogicalstream v0.0.20/go.mod h1:VmPhp8W+MSHR2sIPdwQxGBzMVR2zTzUxEbiMrjH+5eU=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand Down
2 changes: 1 addition & 1 deletion internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func GetInstance() *log.Logger {
ReportCaller: true,
ReportTimestamp: true,
TimeFormat: time.DateTime,
Prefix: "Astro",
Prefix: "blink",
})
})

Expand Down
4 changes: 2 additions & 2 deletions internal/message/arrow_type_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func inferArrowType(value interface{}) arrow.DataType {
}
}

// getValue extracts the value at the specified row index from a column using reflection
func getValue(column arrow.Array, rowIndex int) interface{} {
// GetValue extracts the value at the specified row index from a column using reflection
func GetValue(column arrow.Array, rowIndex int) interface{} {
switch column.(type) {
case *array.Int8:
return column.(*array.Int8).Value(rowIndex)
Expand Down
2 changes: 1 addition & 1 deletion internal/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *Message) SetNewField(name string, value interface{}, fieldType arrow.Da
}
} else {
s = scalar.NewScalar(field.Type)
if err := s.Set(getValue(m.Data.Column(i), 0)); err != nil {
if err := s.Set(GetValue(m.Data.Column(i), 0)); err != nil {
panic(err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/metrics/influx/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *Plugin) IncrementSourceErrCounter() {
func (p *Plugin) flushMetrics() {
t := time.Now()

point := influxdb3.NewPointWithMeasurement("astro_data").
point := influxdb3.NewPointWithMeasurement("blink_data").
SetTag("group", p.groupName).
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
SetField("sent_messages", p.sentCounter.Count()).
Expand All @@ -92,7 +92,7 @@ func (p *Plugin) flushMetrics() {
panic(err)
}

point = influxdb3.NewPointWithMeasurement("astro_data").
point = influxdb3.NewPointWithMeasurement("blink_data").
SetTag("group", p.groupName).
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
SetField("received_messages", p.receivedCounter.Count()).
Expand All @@ -102,7 +102,7 @@ func (p *Plugin) flushMetrics() {
panic(err)
}

point = influxdb3.NewPointWithMeasurement("astro_data").
point = influxdb3.NewPointWithMeasurement("blink_data").
SetTag("group", p.groupName).
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
SetField("sink_errors", p.sinkErrorsCounter.Count()).
Expand All @@ -112,7 +112,7 @@ func (p *Plugin) flushMetrics() {
panic(err)
}

point = influxdb3.NewPointWithMeasurement("astro_data").
point = influxdb3.NewPointWithMeasurement("blink_data").
SetTag("group", p.groupName).
SetTag("pipeline", strconv.Itoa(p.pipelineId)).
SetField("source_errors", p.sourceErrorsCounter.Count()).
Expand Down
6 changes: 3 additions & 3 deletions internal/processors/openai/plugin.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package openai

import (
"astro/internal/message"
"astro/internal/schema"
"astro/internal/stream_context"
"blink/internal/message"
"blink/internal/schema"
"blink/internal/stream_context"
"context"
"github.com/apache/arrow/go/v14/arrow"
)
Expand Down
4 changes: 2 additions & 2 deletions internal/processors/processor.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package processors

import (
"astro/internal/message"
"astro/internal/schema"
"blink/internal/message"
"blink/internal/schema"
"context"
)

Expand Down
6 changes: 3 additions & 3 deletions internal/service_registry/registry.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package service_registry

import (
"astro/config"
"astro/internal/stream_context"
"blink/config"
"blink/internal/stream_context"
"context"
"fmt"
"github.com/charmbracelet/log"
Expand Down Expand Up @@ -56,7 +56,7 @@ func (r *Registry) Start() {
}

_, err = r.etcdClient.Put(context.Background(), fmt.Sprintf(serviceKeyTemplate, r.pipelineId), string(r.state), clientv3.WithLease(leaseResp.ID))
r.logger.Info("Ping registry with", "state", r.state)
r.logger.Info("State update", "state", r.state)
if err != nil {
r.logger.Errorf("Failed to set the key into registry %s", err.Error())
return
Expand Down
8 changes: 4 additions & 4 deletions internal/sinks/kafka/plugin.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package kafka

import (
"astro/internal/message"
"astro/internal/schema"
"astro/internal/sinks"
"blink/internal/message"
"blink/internal/schema"
"blink/internal/sinks"
"context"
"github.com/charmbracelet/log"
gokafka "github.com/confluentinc/confluent-kafka-go/kafka"
Expand All @@ -30,7 +30,7 @@ func NewKafkaSinkPlugin(config Config, schema []schema.StreamSchema) sinks.DataS
func (s *SinkPlugin) Connect(ctx context.Context) error {
p, err := gokafka.NewProducer(&gokafka.ConfigMap{
"bootstrap.servers": strings.Join(s.writerConfig.Brokers, ","),
"client.id": "astro-writer",
"client.id": "blink-writer",
"acks": "all",
"security.protocol": "SASL_SSL",
"go.batch.producer": true,
Expand Down
Loading