Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Feb 7, 2024
1 parent f63de23 commit 62fc659
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 40 deletions.
2 changes: 1 addition & 1 deletion configurator/frontend/catalog/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
"packageManager": "pnpm@8.2.0",
"engines": {
"pnpm": ">=3",
"node": ">=14 <18"
"node": ">=14 <20"
}
}
2 changes: 1 addition & 1 deletion configurator/frontend/main/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"packageManager": "pnpm@8.2.0",
"engines": {
"pnpm": ">=3",
"node": ">=14 <18"
"node": ">=14 <20"
},
"browserslist": {
"production": [
Expand Down
2 changes: 1 addition & 1 deletion configurator/frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"packageManager": "pnpm@8.2.0",
"engines": {
"pnpm": ">=3",
"node": ">=14 <18"
"node": ">=14 <20"
},
"devDependencies": {
"@types/jest": "^26.0.24",
Expand Down
2 changes: 2 additions & 0 deletions server/airbyte/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (r *Runner) Check(airbyteSourceConfig interface{}) error {
case connectionStatusSucceed:
return nil
case connectionStatusFailed:
errMsg := Instance.BuildMsg("Error executing airbyte check:", resultParser.output, errWriter, nil)
logging.Error(errMsg)
return errors.New(resultParser.parsedRow.ConnectionStatus.Message)
default:
return fmt.Errorf("unknown airbyte connection status [%s]: %s", resultParser.parsedRow.ConnectionStatus.Status, resultParser.parsedRow.ConnectionStatus.Message)
Expand Down
60 changes: 47 additions & 13 deletions server/events/native_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ func TimedEventBuilder() interface{} {
return &TimedEvent{}
}

//NativeQueue is a event queue implementation by Jitsu
// NativeQueue is a event queue implementation by Jitsu
type NativeQueue struct {
namespace string
subsystem string
identifier string
queue queue.Queue

metricsReporter internal.MetricReporter
retriesBuffer chan *TimedEvent
closed chan struct{}
}

Expand All @@ -47,16 +48,18 @@ func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue que
subsystem: subsystem,
identifier: identifier,
metricsReporter: metricsReporter,
retriesBuffer: make(chan *TimedEvent, 10000),
closed: make(chan struct{}, 1),
}

safego.Run(nq.startMonitor)
safego.Run(nq.startAsync)
return nq, nil
}

func (q *NativeQueue) startMonitor() {
func (q *NativeQueue) startAsync() {
debugTicker := time.NewTicker(time.Minute * 10)
metricsTicker := time.NewTicker(time.Second * 60)
bufferTicker := time.NewTicker(time.Minute * 1)
for {
select {
case <-q.closed:
Expand All @@ -66,56 +69,87 @@ func (q *NativeQueue) startMonitor() {
case <-debugTicker.C:
size := q.queue.Size()
logging.Infof("[queue: %s_%s_%s] current size: %d", q.namespace, q.subsystem, q.identifier, size)
case <-bufferTicker.C:
count := 0
loop:
for {
select {
case te := <-q.retriesBuffer:
if te == nil {
break loop
}
if err := q.queue.Push(te); err != nil {
logSkippedEvent(te.Payload, fmt.Errorf("Error pushing event to the queue: %v", err))
}
count++
q.metricsReporter.EnqueuedEvent(q.subsystem, q.identifier)
default:
break loop
}
}
if count > 0 {
logging.Infof("[queue: %s_%s_%s] pushed %d events from retries buffer", q.namespace, q.subsystem, q.identifier, count)
}
}
}
}

func (q *NativeQueue) Consume(f map[string]interface{}, tokenID string) {
q.ConsumeTimed(f, timestamp.Now().UTC(), tokenID)
q.ConsumeTimed(f, timestamp.Now().UTC(), tokenID, 0)
}

func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, tokenID string) {
func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, tokenID string, retriesCount int) {
te := &TimedEvent{
Payload: payload,
DequeuedTime: t,
TokenID: tokenID,
RetriesCount: retriesCount,
}
if retriesCount > 0 {
select {
case <-q.closed:
return
case q.retriesBuffer <- te:
return
default:
//buffer is full. insert right away
}
}

if err := q.queue.Push(te); err != nil {
logSkippedEvent(payload, fmt.Errorf("Error pushing event to the queue: %v", err))
return
}

q.metricsReporter.EnqueuedEvent(q.subsystem, q.identifier)
}

func (q *NativeQueue) DequeueBlock() (Event, time.Time, string, error) {
func (q *NativeQueue) DequeueBlock() (Event, time.Time, string, int, error) {
ite, err := q.queue.Pop()
if err != nil {
if err == queue.ErrQueueClosed {
return nil, time.Time{}, "", ErrQueueClosed
return nil, time.Time{}, "", 0, ErrQueueClosed
}

return nil, time.Time{}, "", err
return nil, time.Time{}, "", 0, err
}

q.metricsReporter.DequeuedEvent(q.subsystem, q.identifier)

te, ok := ite.(*TimedEvent)
if !ok {
return nil, time.Time{}, "", fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite)
return nil, time.Time{}, "", 0, fmt.Errorf("wrong type of event dto in queue. Expected: *TimedEvent, actual: %T (%s)", ite, ite)
}

return te.Payload, te.DequeuedTime, te.TokenID, nil
return te.Payload, te.DequeuedTime, te.TokenID, te.RetriesCount, nil
}

//Close closes underlying queue
// Close closes underlying queue
func (q *NativeQueue) Close() error {
select {
case <-q.closed:
return nil
default:
close(q.closed)
close(q.retriesBuffer)
return q.queue.Close()
}
}
15 changes: 8 additions & 7 deletions server/events/queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/jitsucom/jitsu/server/queue"
)

//TimedEvent is used for keeping events with time in queue
// TimedEvent is used for keeping events with time in queue
type TimedEvent struct {
Payload map[string]interface{}
DequeuedTime time.Time
TokenID string
RetriesCount int
}

type DummyQueue struct {
Expand All @@ -27,19 +28,19 @@ func (d *DummyQueue) Close() error {
func (d *DummyQueue) Consume(f map[string]interface{}, tokenID string) {
}

func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string) {
func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string, retriesCount int) {
}

func (d *DummyQueue) DequeueBlock() (Event, time.Time, string, error) {
return nil, time.Time{}, "", fmt.Errorf("DequeueBlock not supported on DummyQueue")
func (d *DummyQueue) DequeueBlock() (Event, time.Time, string, int, error) {
return nil, time.Time{}, "", 0, fmt.Errorf("DequeueBlock not supported on DummyQueue")
}

//Queue is an events queue. Possible implementations (dque, leveldbqueue, native)
// Queue is an events queue. Possible implementations (dque, leveldbqueue, native)
type Queue interface {
io.Closer
Consume(f map[string]interface{}, tokenID string)
ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string)
DequeueBlock() (Event, time.Time, string, error)
ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string, retriesCount int)
DequeueBlock() (Event, time.Time, string, int, error)
}

type QueueFactory struct {
Expand Down
22 changes: 14 additions & 8 deletions server/storages/streaming.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storages

import (
"fmt"
"github.com/jitsucom/jitsu/server/adapters"
"github.com/jitsucom/jitsu/server/appconfig"
"github.com/jitsucom/jitsu/server/errorj"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/jitsucom/jitsu/server/timestamp"
"github.com/jitsucom/jitsu/server/utils"
"go.uber.org/atomic"
"math"
"math/rand"
"time"
)
Expand Down Expand Up @@ -71,7 +73,7 @@ func (sw *StreamingWorker) start() {
break
}

fact, dequeuedTime, tokenID, err := sw.eventQueue.DequeueBlock()
fact, dequeuedTime, tokenID, retriesCount, err := sw.eventQueue.DequeueBlock()
if err != nil {
if err == events.ErrQueueClosed && sw.closed.Load() {
continue
Expand All @@ -83,7 +85,7 @@ func (sw *StreamingWorker) start() {

//dequeued event was from retry call and retry timeout hasn't come
if timestamp.Now().Before(dequeuedTime) {
sw.eventQueue.ConsumeTimed(fact, dequeuedTime, tokenID)
sw.eventQueue.ConsumeTimed(fact, dequeuedTime, tokenID, retriesCount)
continue
}
_, recognizedEvent := fact[schema.JitsuUserRecognizedEvent]
Expand Down Expand Up @@ -149,9 +151,11 @@ func (sw *StreamingWorker) start() {
WithProperty(errorj.DestinationType, sw.streamingStorage.Type())

var retryInfoInLog string
retry := IsConnectionError(err)
var delay time.Duration
retry := IsConnectionError(updateErr) && retriesCount < 4
if retry {
retryInfoInLog = "connection problem. event will be re-updated after 20 seconds\n"
delay = time.Duration(math.Pow10(retriesCount)) * time.Minute
retryInfoInLog = fmt.Sprintf("connection problem. event will be re-updated after %s. (retry: %d)\n", delay.String(), retriesCount)
}
if errorj.IsSystemError(err) {
logging.SystemErrorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString())
Expand All @@ -161,7 +165,7 @@ func (sw *StreamingWorker) start() {

if retry {
//retry
sw.eventQueue.ConsumeTimed(fact, timestamp.Now().Add(20*time.Second), tokenID)
sw.eventQueue.ConsumeTimed(fact, timestamp.Now().Add(delay), tokenID, retriesCount+1)
}
}
} else {
Expand All @@ -171,9 +175,11 @@ func (sw *StreamingWorker) start() {
WithProperty(errorj.DestinationType, sw.streamingStorage.Type())

var retryInfoInLog string
retry := IsConnectionError(err)
var delay time.Duration
retry := IsConnectionError(insertErr) && retriesCount < 4
if retry {
retryInfoInLog = "connection problem. event will be re-inserted after 20 seconds\n"
delay = time.Duration(math.Pow10(retriesCount)) * time.Minute
retryInfoInLog = fmt.Sprintf("connection problem. event will be re-inserted after %s. (retry: %d)\n", delay.String(), retriesCount)
}
if errorj.IsSystemError(err) {
logging.SystemErrorf("%+v\n%sorigin event: %s", err, retryInfoInLog, flattenObject.DebugString())
Expand All @@ -183,7 +189,7 @@ func (sw *StreamingWorker) start() {

if retry {
//retry
sw.eventQueue.ConsumeTimed(fact, timestamp.Now().Add(20*time.Second), tokenID)
sw.eventQueue.ConsumeTimed(fact, timestamp.Now().Add(delay), tokenID, retriesCount+1)
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions server/storages/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ func dryRun(payload events.Event, processor *schema.Processor, tableHelper *Tabl
}

func IsConnectionError(err error) bool {
return strings.Contains(err.Error(), "connection refused") ||
strings.Contains(err.Error(), "EOF") ||
strings.Contains(err.Error(), "write: broken pipe") ||
strings.Contains(err.Error(), "read-only transaction") ||
strings.Contains(err.Error(), "context deadline exceeded") ||
strings.Contains(err.Error(), "connection reset by peer") ||
strings.Contains(err.Error(), "timed out") ||
strings.Contains(err.Error(), "no such host")
s := err.Error()
return strings.Contains(s, "connection refused") ||
strings.Contains(s, "EOF") ||
strings.Contains(s, "write: broken pipe") ||
strings.Contains(s, "read-only transaction") ||
strings.Contains(s, "context deadline exceeded") ||
strings.Contains(s, "connection reset by peer") ||
strings.Contains(s, "timed out") ||
strings.Contains(s, "no such host")
}

// syncStoreImpl implements common behaviour used to storing chunk of pulled data to any storages with processing
Expand Down Expand Up @@ -81,7 +82,7 @@ func syncStoreImpl(storage Storage, overriddenDataSchema *schema.BatchHeader, ob
return nil
}

//cleanImpl implements common table cleaning
// cleanImpl implements common table cleaning
func cleanImpl(storage Storage, tableName string) error {
adapter, _ := storage.getAdapters()
return adapter.Truncate(tableName)
Expand Down

0 comments on commit 62fc659

Please sign in to comment.