Skip to content

Commit

Permalink
sink: support cloud storage sink (#927)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Feb 13, 2025
1 parent 78a3f9b commit da834db
Show file tree
Hide file tree
Showing 28 changed files with 5,596 additions and 4 deletions.
198 changes: 198 additions & 0 deletions downstreamadapter/sink/cloudstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"math"
"net/url"
"strings"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/downstreamadapter/worker"
"github.com/pingcap/ticdc/pkg/common"
commonType "github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/util"
"github.com/pingcap/tidb/br/pkg/storage"
putil "github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// It will send the events to cloud storage systems.
// Messages are encoded in the specific protocol and then sent to the defragmenter.
// The data flow is as follows: **data** -> encodingWorkers -> defragmenter -> dmlWorkers -> external storage
// The defragmenter will defragment the out-of-order encoded messages and sends encoded
// messages to individual dmlWorkers.
// The dmlWorkers will write the encoded messages to external storage in parallel between different tables.
type CloudStorageSink struct {
changefeedID commonType.ChangeFeedID
scheme string
outputRawChangeEvent bool

// workers defines a group of workers for writing events to external storage.
dmlWorker *worker.CloudStorageDMLWorker
ddlWorker *worker.CloudStorageDDLWorker

statistics *metrics.Statistics

isNormal uint32
}

func verifyCloudStorageSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *config.SinkConfig) error {
var (
protocol config.Protocol
storage storage.ExternalStorage
err error
)
cfg := cloudstorage.NewConfig()
if err = cfg.Apply(ctx, sinkURI, sinkConfig); err != nil {
return err
}
if protocol, err = helper.GetProtocol(putil.GetOrZero(sinkConfig.Protocol)); err != nil {
return err
}
if _, err = util.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt); err != nil {
return err
}
if storage, err = helper.GetExternalStorageFromURI(ctx, sinkURI.String()); err != nil {
return err
}
storage.Close()
return nil
}

func newCloudStorageSink(
ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *config.SinkConfig,
cleanupJobs []func(), /* only for test */
) (*CloudStorageSink, error) {
// create cloud storage config and then apply the params of sinkURI to it.
cfg := cloudstorage.NewConfig()
err := cfg.Apply(ctx, sinkURI, sinkConfig)
if err != nil {
return nil, err
}
// fetch protocol from replicaConfig defined by changefeed config file.
protocol, err := helper.GetProtocol(
putil.GetOrZero(sinkConfig.Protocol),
)
if err != nil {
return nil, errors.Trace(err)
}
// get cloud storage file extension according to the specific protocol.
ext := helper.GetFileExtension(protocol)
// the last param maxMsgBytes is mainly to limit the size of a single message for
// batch protocols in mq scenario. In cloud storage sink, we just set it to max int.
encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt)
if err != nil {
return nil, errors.Trace(err)
}
storage, err := helper.GetExternalStorageFromURI(ctx, sinkURI.String())
if err != nil {
return nil, err
}
s := &CloudStorageSink{
changefeedID: changefeedID,
scheme: strings.ToLower(sinkURI.Scheme),
outputRawChangeEvent: sinkConfig.CloudStorageConfig.GetOutputRawChangeEvent(),
statistics: metrics.NewStatistics(changefeedID, "CloudStorageSink"),
}

s.dmlWorker, err = worker.NewCloudStorageDMLWorker(changefeedID, storage, cfg, encoderConfig, ext, s.statistics)
if err != nil {
return nil, err
}
s.ddlWorker = worker.NewCloudStorageDDLWorker(changefeedID, sinkURI, cfg, cleanupJobs, storage, s.statistics)
return s, nil
}

func (s *CloudStorageSink) SinkType() common.SinkType {
return common.CloudStorageSinkType
}

func (s *CloudStorageSink) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
return s.dmlWorker.Run(ctx)
})

eg.Go(func() error {
return s.ddlWorker.Run(ctx)
})

return eg.Wait()
}

func (s *CloudStorageSink) IsNormal() bool {
return atomic.LoadUint32(&s.isNormal) == 1
}

func (s *CloudStorageSink) AddDMLEvent(event *commonEvent.DMLEvent) {
s.dmlWorker.AddDMLEvent(event)
}

func (s *CloudStorageSink) PassBlockEvent(event commonEvent.BlockEvent) {
event.PostFlush()
}

func (s *CloudStorageSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch e := event.(type) {
case *commonEvent.DDLEvent:
if e.TiDBOnly {
// run callback directly and return
e.PostFlush()
return nil
}
err := s.ddlWorker.WriteBlockEvent(e)
if err != nil {
atomic.StoreUint32(&s.isNormal, 0)
return errors.Trace(err)
}
case *commonEvent.SyncPointEvent:
log.Error("CloudStorageSink doesn't support Sync Point Event",
zap.String("namespace", s.changefeedID.Namespace()),
zap.String("changefeed", s.changefeedID.Name()),
zap.Any("event", event))
default:
log.Error("CloudStorageSink doesn't support this type of block event",
zap.String("namespace", s.changefeedID.Namespace()),
zap.String("changefeed", s.changefeedID.Name()),
zap.Any("eventType", event.GetType()))
}
return nil
}

func (s *CloudStorageSink) AddCheckpointTs(ts uint64) {
s.ddlWorker.AddCheckpointTs(ts)
}

func (s *CloudStorageSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
s.ddlWorker.SetTableSchemaStore(tableSchemaStore)
}

func (s *CloudStorageSink) Close(_ bool) {
s.dmlWorker.Close()
s.ddlWorker.Close()
if s.statistics != nil {
s.statistics.Close()
}
}
33 changes: 33 additions & 0 deletions downstreamadapter/sink/cloudstorage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"

"github.com/pingcap/ticdc/pkg/common"
)

func newCloudStorageSinkForTest() (*CloudStorageSink, error) {
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
// csvProtocol := "csv-protocol"
// sinkConfig := &config.SinkConfig{Protocol: &csvProtocol}

sink := &CloudStorageSink{
changefeedID: changefeedID,
}
go sink.Run(ctx)
return sink, nil
}
70 changes: 70 additions & 0 deletions downstreamadapter/sink/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,83 @@
package helper

import (
"context"
"net/url"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/pkg/errors"
)

// GetExternalStorageFromURI creates a new storage.ExternalStorage from a uri.
func GetExternalStorageFromURI(
ctx context.Context, uri string,
) (storage.ExternalStorage, error) {
return GetExternalStorage(ctx, uri, nil, DefaultS3Retryer())
}

// GetExternalStorage creates a new storage.ExternalStorage based on the uri and options.
func GetExternalStorage(
ctx context.Context, uri string,
opts *storage.BackendOptions,
retryer request.Retryer,
) (storage.ExternalStorage, error) {
backEnd, err := storage.ParseBackend(uri, opts)
if err != nil {
return nil, errors.Trace(err)
}

ret, err := storage.New(ctx, backEnd, &storage.ExternalStorageOptions{
SendCredentials: false,
S3Retryer: retryer,
})
if err != nil {
retErr := errors.ErrFailToCreateExternalStorage.Wrap(errors.Trace(err))
return nil, retErr.GenWithStackByArgs("creating ExternalStorage for s3")
}

// Check the connection and ignore the returned bool value, since we don't care if the file exists.
_, err = ret.FileExists(ctx, "test")
if err != nil {
retErr := errors.ErrFailToCreateExternalStorage.Wrap(errors.Trace(err))
return nil, retErr.GenWithStackByArgs("creating ExternalStorage for s3")
}
return ret, nil
}

// retryerWithLog wraps the client.DefaultRetryer, and logs when retrying.
type retryerWithLog struct {
client.DefaultRetryer
}

// DefaultS3Retryer is the default s3 retryer, maybe this function
// should be extracted to another place.
func DefaultS3Retryer() request.Retryer {
return retryerWithLog{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: 3,
MinRetryDelay: 1 * time.Second,
MinThrottleDelay: 2 * time.Second,
},
}
}

// NewS3Retryer creates a new s3 retryer.
func NewS3Retryer(maxRetries int, minRetryDelay, minThrottleDelay time.Duration) request.Retryer {
return retryerWithLog{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: maxRetries,
MinRetryDelay: minRetryDelay,
MinThrottleDelay: minThrottleDelay,
},
}
}

// GetTopic returns the topic name from the sink URI.
func GetTopic(sinkURI *url.URL) (string, error) {
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID
return newMySQLSink(ctx, changefeedID, 16, config, sinkURI)
case sink.KafkaScheme, sink.KafkaSSLScheme:
return newKafkaSink(ctx, changefeedID, sinkURI, config.SinkConfig)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
return newCloudStorageSink(ctx, changefeedID, sinkURI, config.SinkConfig, nil)
case sink.BlackHoleScheme:
return newBlackHoleSink()
}
Expand All @@ -67,6 +69,8 @@ func VerifySink(ctx context.Context, config *config.ChangefeedConfig, changefeed
return verifyMySQLSink(ctx, sinkURI, config)
case sink.KafkaScheme, sink.KafkaSSLScheme:
return verifyKafkaSink(ctx, changefeedID, sinkURI, config.SinkConfig)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
return verifyCloudStorageSink(ctx, changefeedID, sinkURI, config.SinkConfig)
case sink.BlackHoleScheme:
return nil
}
Expand Down
Loading

0 comments on commit da834db

Please sign in to comment.