Skip to content

Commit

Permalink
feat: avoid compressing block data in replicator (#99)
Browse files Browse the repository at this point in the history
Avoid compressing block data by reusing the downloaded bytes.
  • Loading branch information
bestmike007 authored Mar 27, 2024
1 parent 1be7f22 commit ae8e89f
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 116 deletions.
138 changes: 85 additions & 53 deletions internal/storage/blobstorage/gcs/blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/coinbase/chainstorage/internal/utils/fxparams"
"github.com/coinbase/chainstorage/internal/utils/instrument"
"github.com/coinbase/chainstorage/internal/utils/log"
"github.com/coinbase/chainstorage/protos/coinbase/c3/common"
api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage"
)

Expand All @@ -44,6 +45,7 @@ type (
presignedUrlExpiration time.Duration
blobStorageMetrics *blobStorageMetrics
instrumentUpload instrument.InstrumentWithResult[string]
instrumentUploadRaw instrument.InstrumentWithResult[string]
instrumentDownload instrument.InstrumentWithResult[*api.Block]
}

Expand Down Expand Up @@ -101,81 +103,111 @@ func New(params BlobStorageParams) (internal.BlobStorage, error) {
presignedUrlExpiration: params.Config.GCP.PresignedUrlExpiration,
blobStorageMetrics: blobStorageMetrics,
instrumentUpload: instrument.NewWithResult[string](metrics, "upload"),
instrumentUploadRaw: instrument.NewWithResult[string](metrics, "upload_raw"),
instrumentDownload: instrument.NewWithResult[*api.Block](metrics, "download"),
}, nil
}

func (s *blobStorageImpl) getObjectKey(blockchain common.Blockchain, sidechain api.SideChain, network common.Network, metadata *api.BlockMetadata, compression api.Compression) (string, error) {
var key string
var err error
blockchainNetwork := fmt.Sprintf("%s/%s", blockchain, network)
tagHeightHash := fmt.Sprintf("%d/%d/%s", metadata.Tag, metadata.Height, metadata.Hash)
if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE {
key = fmt.Sprintf(
"%s/%s/%s", blockchainNetwork, sidechain, tagHeightHash,
)
} else {
key = fmt.Sprintf(
"%s/%s", blockchainNetwork, tagHeightHash,
)
}
key, err = storage_utils.GetObjectKey(key, compression)
if err != nil {
return "", xerrors.Errorf("failed to get object key: %w", err)
}
return key, nil
}

func (s *blobStorageImpl) uploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) {
key, err := s.getObjectKey(rawBlockData.Blockchain, rawBlockData.SideChain, rawBlockData.Network, rawBlockData.BlockMetadata, rawBlockData.BlockDataCompression)
if err != nil {
return "", err
}

// #nosec G401
h := md5.New()
size, err := h.Write(rawBlockData.BlockData)
if err != nil {
return "", xerrors.Errorf("failed to compute checksum: %w", err)
}

checksum := h.Sum(nil)

object := s.client.Bucket(s.bucket).Object(key)
w := object.NewWriter(ctx)
finalizer := finalizer.WithCloser(w)
defer finalizer.Finalize()

_, err = w.Write(rawBlockData.BlockData)
if err != nil {
return "", xerrors.Errorf("failed to upload block data: %w", err)
}
err = finalizer.Close()
if err != nil {
return "", xerrors.Errorf("failed to upload block data: %w", err)
}

attrs := w.Attrs()
if !bytes.Equal(checksum, attrs.MD5) {
return "", xerrors.Errorf("uploaded block md5 checksum %x is different from expected %x", attrs.MD5, checksum)
}

// a workaround to use timer
s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond)

return key, nil
}

func (s *blobStorageImpl) UploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) {
return s.instrumentUploadRaw.Instrument(ctx, func(ctx context.Context) (string, error) {
defer s.logDuration("upload", time.Now())

// Skip the upload if the block itself is skipped.
if rawBlockData.BlockMetadata.Skipped {
return "", nil
}

return s.uploadRaw(ctx, rawBlockData)
})
}

func (s *blobStorageImpl) Upload(ctx context.Context, block *api.Block, compression api.Compression) (string, error) {
return s.instrumentUpload.Instrument(ctx, func(ctx context.Context) (string, error) {
var key string
defer s.logDuration("upload", time.Now())

// Skip the upload if the block itself is skipped.
if block.Metadata.Skipped {
return "", nil
}

data, err := proto.Marshal(block)
if err != nil {
return "", xerrors.Errorf("failed to marshal block: %w", err)
}

blockchainNetwork := fmt.Sprintf("%s/%s", block.Blockchain, block.Network)
tagHeightHash := fmt.Sprintf("%d/%d/%s", block.Metadata.Tag, block.Metadata.Height, block.Metadata.Hash)
if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE {
key = fmt.Sprintf(
"%s/%s/%s", blockchainNetwork, block.SideChain, tagHeightHash,
)
} else {
key = fmt.Sprintf(
"%s/%s", blockchainNetwork, tagHeightHash,
)
}

data, err = storage_utils.Compress(data, compression)
if err != nil {
return "", xerrors.Errorf("failed to compress data with type %v: %w", compression.String(), err)
}
key, err = storage_utils.GetObjectKey(key, compression)
if err != nil {
return "", xerrors.Errorf("failed to get object key: %w", err)
}

// #nosec G401
h := md5.New()
size, err := h.Write(data)
if err != nil {
return "", xerrors.Errorf("failed to compute checksum: %w", err)
}

checksum := h.Sum(nil)

object := s.client.Bucket(s.bucket).Object(key)
w := object.NewWriter(ctx)
finalizer := finalizer.WithCloser(w)
defer finalizer.Finalize()

_, err = w.Write(data)
if err != nil {
return "", xerrors.Errorf("failed to upload block data: %w", err)
}
err = finalizer.Close()
if err != nil {
return "", xerrors.Errorf("failed to upload block data: %w", err)
}

attrs := w.Attrs()
if err != nil {
return "", xerrors.Errorf("failed to load attributes for uploaded block data: %w", err)
}
if !bytes.Equal(checksum, attrs.MD5) {
return "", xerrors.Errorf("uploaded block md5 checksum %x is different from expected %x", attrs.MD5, checksum)
}

// a workaround to use timer
s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond)

return key, nil
return s.uploadRaw(ctx, &internal.RawBlockData{
Blockchain: block.Blockchain,
SideChain: block.SideChain,
Network: block.Network,
BlockMetadata: block.Metadata,
BlockData: data,
BlockDataCompression: compression,
})
})
}

Expand Down
11 changes: 11 additions & 0 deletions internal/storage/blobstorage/internal/blobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@ import (

"github.com/coinbase/chainstorage/internal/config"
"github.com/coinbase/chainstorage/internal/utils/fxparams"
"github.com/coinbase/chainstorage/protos/coinbase/c3/common"
api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage"
)

type (
RawBlockData struct {
Blockchain common.Blockchain
SideChain api.SideChain
Network common.Network
BlockMetadata *api.BlockMetadata
BlockData []byte
BlockDataCompression api.Compression
}

BlobStorage interface {
Upload(ctx context.Context, block *api.Block, compression api.Compression) (string, error)
UploadRaw(ctx context.Context, rawBlockData *RawBlockData) (string, error)
Download(ctx context.Context, metadata *api.BlockMetadata) (*api.Block, error)
PreSign(ctx context.Context, objectKey string) (string, error)
}
Expand Down
16 changes: 16 additions & 0 deletions internal/storage/blobstorage/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/storage/blobstorage/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type (
BlobStorage = internal.BlobStorage
BlobStorageFactory = internal.BlobStorageFactory
BlobStorageFactoryParams = internal.BlobStorageFactoryParams
RawBlockData = internal.RawBlockData
)

var Module = fx.Options(
Expand Down
Loading

0 comments on commit ae8e89f

Please sign in to comment.