Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dep 1080 #1

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
521 changes: 521 additions & 0 deletions clientlibrary/checkpoint/pgcheckpoint/checkpointer.go

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions clientlibrary/checkpoint/pgcheckpoint/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pgcheckpoint

import "github.com/vmware/vmware-go-kcl/clientlibrary/config"

type Configuration struct {
Client config.KinesisClientConfiguration
Connector Connector

SchemaName string

// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
TableName string

WithoutCreateTable bool
}
1 change: 1 addition & 0 deletions clientlibrary/checkpoint/pgcheckpoint/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package pgcheckpoint
12 changes: 12 additions & 0 deletions clientlibrary/checkpoint/pgcheckpoint/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pgcheckpoint

import "time"

type Checkpoint struct {
ShardID string
StreamName string
SequenceNumber string
LeaseOwner string
ParentID string
LeaseTimeout *time.Time
}
181 changes: 181 additions & 0 deletions clientlibrary/config/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package config

import (
creds "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/vmware/vmware-go-kcl/clientlibrary/metrics"
"github.com/vmware/vmware-go-kcl/logger"
)

// Deprecated: Use KinesisClientConfiguration and the appropriate Checkpoint Config (i.e. DynamoCheckpointerConfiguration)
// Configuration for the Kinesis Client Library.
// Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.
type KinesisClientLibConfiguration struct {
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
ApplicationName string

// DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client.
// If this is empty, the default generated endpoint will be used.
DynamoDBEndpoint string

// KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client.
// If this is empty, the default generated endpoint will be used.
KinesisEndpoint string

// KinesisCredentials is used to access Kinesis
KinesisCredentials *creds.Credentials

// DynamoDBCredentials is used to access DynamoDB
DynamoDBCredentials *creds.Credentials

// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
TableName string

// StreamName is the name of Kinesis stream
StreamName string

// EnableEnhancedFanOutConsumer enables enhanced fan-out consumer
// See: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
// Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled.
EnableEnhancedFanOutConsumer bool

// EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. If this isn't set the ApplicationName will be used.
EnhancedFanOutConsumerName string

// EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted
EnhancedFanOutConsumerARN string

// WorkerID used to distinguish different workers/processes of a Kinesis application
WorkerID string

// InitialPositionInStream specifies the Position in the stream where a new application should start from
InitialPositionInStream InitialPositionInStream

// InitialPositionInStreamExtended provides actual AT_TIMESTAMP value
InitialPositionInStreamExtended InitialPositionInStreamExtended

// credentials to access Kinesis/Dynamo: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/
// Note: No need to configure here. Use NewEnvCredentials for testing and EC2RoleProvider for production

// FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
FailoverTimeMillis int

// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
LeaseRefreshPeriodMillis int

// MaxRecords Max records to read per Kinesis getRecords() call
MaxRecords int

// IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
IdleTimeBetweenReadsInMillis int

// CallProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
// GetRecords returned an empty record list.
CallProcessRecordsEvenForEmptyRecordList bool

// ParentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
ParentShardPollIntervalMillis int

// ShardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
ShardSyncIntervalMillis int

// CleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration)
CleanupTerminatedShardsBeforeExpiry bool

// kinesisClientConfig Client Configuration used by Kinesis client
// dynamoDBClientConfig Client Configuration used by DynamoDB client
// Note: we will use default client provided by AWS SDK

// TaskBackoffTimeMillis Backoff period when tasks encounter an exception
TaskBackoffTimeMillis int

// ValidateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
ValidateSequenceNumberBeforeCheckpointing bool

// RegionName The region name for the service
RegionName string

// ShutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
ShutdownGraceMillis int

// Operation parameters

// Max leases this Worker can handle at a time
MaxLeasesForWorker int

// Max leases to steal at one time (for load balancing)
MaxLeasesToStealAtOneTime int

// Read capacity to provision when creating the lease table (dynamoDB).
InitialLeaseTableReadCapacity int

// Write capacity to provision when creating the lease table.
InitialLeaseTableWriteCapacity int

// Worker should skip syncing shards and leases at startup if leases are present
// This is useful for optimizing deployments to large fleets working on a stable stream.
SkipShardSyncAtWorkerInitializationIfLeasesExist bool

// Logger used to log message.
Logger logger.Logger

// MonitoringService publishes per worker-scoped metrics.
MonitoringService metrics.MonitoringService

// EnableLeaseStealing turns on lease stealing
EnableLeaseStealing bool

// LeaseStealingIntervalMillis The number of milliseconds between rebalance tasks
LeaseStealingIntervalMillis int

// LeaseStealingClaimTimeoutMillis The number of milliseconds to wait before another worker can aquire a claimed shard
LeaseStealingClaimTimeoutMillis int

// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
LeaseSyncingTimeIntervalMillis int
}

func (c KinesisClientLibConfiguration) KinesisClientConfiguration() KinesisClientConfiguration {
return KinesisClientConfiguration{
ApplicationName: c.ApplicationName,
KinesisEndpoint: c.KinesisEndpoint,
KinesisCredentials: c.KinesisCredentials,
StreamName: c.StreamName,
EnableEnhancedFanOutConsumer: c.EnableEnhancedFanOutConsumer,
EnhancedFanOutConsumerName: c.EnhancedFanOutConsumerName,
EnhancedFanOutConsumerARN: c.EnhancedFanOutConsumerARN,
WorkerID: c.WorkerID,
InitialPositionInStream: c.InitialPositionInStream,
InitialPositionInStreamExtended: c.InitialPositionInStreamExtended,
FailoverTimeMillis: c.FailoverTimeMillis,
LeaseRefreshPeriodMillis: c.LeaseRefreshPeriodMillis,
MaxRecords: c.MaxRecords,
IdleTimeBetweenReadsInMillis: c.IdleTimeBetweenReadsInMillis,
CallProcessRecordsEvenForEmptyRecordList: c.CallProcessRecordsEvenForEmptyRecordList,
ParentShardPollIntervalMillis: c.ParentShardPollIntervalMillis,
ShardSyncIntervalMillis: c.ShardSyncIntervalMillis,
CleanupTerminatedShardsBeforeExpiry: c.CleanupTerminatedShardsBeforeExpiry,
TaskBackoffTimeMillis: c.TaskBackoffTimeMillis,
ValidateSequenceNumberBeforeCheckpointing: c.ValidateSequenceNumberBeforeCheckpointing,
RegionName: c.RegionName,
ShutdownGraceMillis: c.ShutdownGraceMillis,
MaxLeasesForWorker: c.MaxLeasesForWorker,
MaxLeasesToStealAtOneTime: c.MaxLeasesToStealAtOneTime,
SkipShardSyncAtWorkerInitializationIfLeasesExist: c.SkipShardSyncAtWorkerInitializationIfLeasesExist,
Logger: c.Logger,
MonitoringService: c.MonitoringService,
EnableLeaseStealing: c.EnableLeaseStealing,
LeaseStealingIntervalMillis: c.LeaseStealingIntervalMillis,
LeaseStealingClaimTimeoutMillis: c.LeaseStealingClaimTimeoutMillis,
LeaseSyncingTimeIntervalMillis: c.LeaseSyncingTimeIntervalMillis,
}
}

func (c KinesisClientLibConfiguration) DynamoCheckpointerConfiguration() DynamoCheckpointerConfiguration {
return DynamoCheckpointerConfiguration{
TableName: c.TableName,
InitialLeaseTableReadCapacity: c.InitialLeaseTableReadCapacity,
InitialLeaseTableWriteCapacity: c.InitialLeaseTableWriteCapacity,
DynamoDBEndpoint: c.DynamoDBEndpoint,
DynamoDBCredentials: c.DynamoDBCredentials,
}
}
101 changes: 101 additions & 0 deletions clientlibrary/config/compat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package config

import (
"testing"

creds "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/stretchr/testify/assert"
"github.com/vmware/vmware-go-kcl/clientlibrary/metrics"
"github.com/vmware/vmware-go-kcl/logger"
)

func TestConfigurationCompatibility(t *testing.T) {
compat := KinesisClientLibConfiguration{
ApplicationName: "application",
KinesisEndpoint: "kinesis.us-west-1.amazonaws.com",
KinesisCredentials: &creds.Credentials{},
StreamName: "arn:aws:kinesis:us-east-1:12345678:stream/some-stream",
EnableEnhancedFanOutConsumer: true,
EnhancedFanOutConsumerName: "foobar",
EnhancedFanOutConsumerARN: "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737",
WorkerID: "2345678",
InitialPositionInStream: 1,
InitialPositionInStreamExtended: InitialPositionInStreamExtended{Position: 1},
FailoverTimeMillis: 10000,
LeaseRefreshPeriodMillis: 100000,
MaxRecords: 500,
IdleTimeBetweenReadsInMillis: 1000,
CallProcessRecordsEvenForEmptyRecordList: true,
ParentShardPollIntervalMillis: 100,
ShardSyncIntervalMillis: 100,
CleanupTerminatedShardsBeforeExpiry: true,
TaskBackoffTimeMillis: 100,
ValidateSequenceNumberBeforeCheckpointing: true,
RegionName: "us-east-1",
ShutdownGraceMillis: 100,
MaxLeasesForWorker: 100,
MaxLeasesToStealAtOneTime: 10,
SkipShardSyncAtWorkerInitializationIfLeasesExist: true,
Logger: logger.GetDefaultLogger(),
MonitoringService: metrics.NoopMonitoringService{},
EnableLeaseStealing: true,
LeaseStealingIntervalMillis: 1000,
LeaseStealingClaimTimeoutMillis: 2000,
LeaseSyncingTimeIntervalMillis: 2000,
TableName: "checkpoint",
InitialLeaseTableReadCapacity: 4,
InitialLeaseTableWriteCapacity: 1,
DynamoDBEndpoint: "dynamodb.us-east-1.amazonaws.com",
DynamoDBCredentials: &creds.Credentials{},
}

t.Run("base", func(t *testing.T) {
kinesis := compat.KinesisClientConfiguration()
expected := KinesisClientConfiguration{
ApplicationName: "application",
KinesisEndpoint: "kinesis.us-west-1.amazonaws.com",
KinesisCredentials: &creds.Credentials{},
StreamName: "arn:aws:kinesis:us-east-1:12345678:stream/some-stream",
EnableEnhancedFanOutConsumer: true,
EnhancedFanOutConsumerName: "foobar",
EnhancedFanOutConsumerARN: "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737",
WorkerID: "2345678",
InitialPositionInStream: 1,
InitialPositionInStreamExtended: InitialPositionInStreamExtended{Position: 1},
FailoverTimeMillis: 10000,
LeaseRefreshPeriodMillis: 100000,
MaxRecords: 500,
IdleTimeBetweenReadsInMillis: 1000,
CallProcessRecordsEvenForEmptyRecordList: true,
ParentShardPollIntervalMillis: 100,
ShardSyncIntervalMillis: 100,
CleanupTerminatedShardsBeforeExpiry: true,
TaskBackoffTimeMillis: 100,
ValidateSequenceNumberBeforeCheckpointing: true,
RegionName: "us-east-1",
ShutdownGraceMillis: 100,
MaxLeasesForWorker: 100,
MaxLeasesToStealAtOneTime: 10,
SkipShardSyncAtWorkerInitializationIfLeasesExist: true,
Logger: logger.GetDefaultLogger(),
MonitoringService: metrics.NoopMonitoringService{},
EnableLeaseStealing: true,
LeaseStealingIntervalMillis: 1000,
LeaseStealingClaimTimeoutMillis: 2000,
LeaseSyncingTimeIntervalMillis: 2000}

assert.Equal(t, expected, kinesis)
})

t.Run("dynamo", func(t *testing.T) {
dynamo := compat.DynamoCheckpointerConfiguration()
expected := DynamoCheckpointerConfiguration{
TableName: "checkpoint",
InitialLeaseTableReadCapacity: 4,
InitialLeaseTableWriteCapacity: 1,
DynamoDBEndpoint: "dynamodb.us-east-1.amazonaws.com",
DynamoDBCredentials: &creds.Credentials{},
}
assert.Equal(t, expected, dynamo)
})
}
40 changes: 20 additions & 20 deletions clientlibrary/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,28 +156,34 @@ type (
Timestamp *time.Time `type:"Timestamp" timestampFormat:"unix"`
}

// Configuration for the Kinesis Client Library.
// Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.
KinesisClientLibConfiguration struct {
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
ApplicationName string

DynamoCheckpointerConfiguration struct {
// DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client.
// If this is empty, the default generated endpoint will be used.
DynamoDBEndpoint string

// KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client.
// If this is empty, the default generated endpoint will be used.
KinesisEndpoint string
// Read capacity to provision when creating the lease table (dynamoDB).
InitialLeaseTableReadCapacity int

// KinesisCredentials is used to access Kinesis
KinesisCredentials *creds.Credentials
// Write capacity to provision when creating the lease table.
InitialLeaseTableWriteCapacity int

// DynamoDBCredentials is used to access DynamoDB
DynamoDBCredentials *creds.Credentials

// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
TableName string
}

KinesisClientConfiguration struct {
// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
ApplicationName string

// KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client.
// If this is empty, the default generated endpoint will be used.
KinesisEndpoint string

// KinesisCredentials is used to access Kinesis
KinesisCredentials *creds.Credentials

// StreamName is the name of Kinesis stream
StreamName string
Expand Down Expand Up @@ -227,6 +233,9 @@ type (
// ShardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
ShardSyncIntervalMillis int

// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
LeaseSyncingTimeIntervalMillis int

// CleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration)
CleanupTerminatedShardsBeforeExpiry bool

Expand Down Expand Up @@ -254,12 +263,6 @@ type (
// Max leases to steal at one time (for load balancing)
MaxLeasesToStealAtOneTime int

// Read capacity to provision when creating the lease table (dynamoDB).
InitialLeaseTableReadCapacity int

// Write capacity to provision when creating the lease table.
InitialLeaseTableWriteCapacity int

// Worker should skip syncing shards and leases at startup if leases are present
// This is useful for optimizing deployments to large fleets working on a stable stream.
SkipShardSyncAtWorkerInitializationIfLeasesExist bool
Expand All @@ -278,9 +281,6 @@ type (

// LeaseStealingClaimTimeoutMillis The number of milliseconds to wait before another worker can aquire a claimed shard
LeaseStealingClaimTimeoutMillis int

// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
LeaseSyncingTimeIntervalMillis int
}
)

Expand Down
Loading