diff --git a/internal/storage/blobstorage/gcs/blob_storage.go b/internal/storage/blobstorage/gcs/blob_storage.go index a3a66162..5b1528cd 100644 --- a/internal/storage/blobstorage/gcs/blob_storage.go +++ b/internal/storage/blobstorage/gcs/blob_storage.go @@ -22,6 +22,7 @@ import ( "github.com/coinbase/chainstorage/internal/utils/fxparams" "github.com/coinbase/chainstorage/internal/utils/instrument" "github.com/coinbase/chainstorage/internal/utils/log" + "github.com/coinbase/chainstorage/protos/coinbase/c3/common" api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" ) @@ -44,6 +45,7 @@ type ( presignedUrlExpiration time.Duration blobStorageMetrics *blobStorageMetrics instrumentUpload instrument.InstrumentWithResult[string] + instrumentUploadRaw instrument.InstrumentWithResult[string] instrumentDownload instrument.InstrumentWithResult[*api.Block] } @@ -101,81 +103,111 @@ func New(params BlobStorageParams) (internal.BlobStorage, error) { presignedUrlExpiration: params.Config.GCP.PresignedUrlExpiration, blobStorageMetrics: blobStorageMetrics, instrumentUpload: instrument.NewWithResult[string](metrics, "upload"), + instrumentUploadRaw: instrument.NewWithResult[string](metrics, "upload_raw"), instrumentDownload: instrument.NewWithResult[*api.Block](metrics, "download"), }, nil } +func (s *blobStorageImpl) getObjectKey(blockchain common.Blockchain, sidechain api.SideChain, network common.Network, metadata *api.BlockMetadata, compression api.Compression) (string, error) { + var key string + var err error + blockchainNetwork := fmt.Sprintf("%s/%s", blockchain, network) + tagHeightHash := fmt.Sprintf("%d/%d/%s", metadata.Tag, metadata.Height, metadata.Hash) + if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE { + key = fmt.Sprintf( + "%s/%s/%s", blockchainNetwork, sidechain, tagHeightHash, + ) + } else { + key = fmt.Sprintf( + "%s/%s", blockchainNetwork, tagHeightHash, + ) + } + key, err = storage_utils.GetObjectKey(key, compression) + if err != nil { + return "", xerrors.Errorf("failed to get object key: %w", err) + } + return key, nil +} + +func (s *blobStorageImpl) uploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) { + key, err := s.getObjectKey(rawBlockData.Blockchain, rawBlockData.SideChain, rawBlockData.Network, rawBlockData.BlockMetadata, rawBlockData.BlockDataCompression) + if err != nil { + return "", err + } + + // #nosec G401 + h := md5.New() + size, err := h.Write(rawBlockData.BlockData) + if err != nil { + return "", xerrors.Errorf("failed to compute checksum: %w", err) + } + + checksum := h.Sum(nil) + + object := s.client.Bucket(s.bucket).Object(key) + w := object.NewWriter(ctx) + finalizer := finalizer.WithCloser(w) + defer finalizer.Finalize() + + _, err = w.Write(rawBlockData.BlockData) + if err != nil { + return "", xerrors.Errorf("failed to upload block data: %w", err) + } + err = finalizer.Close() + if err != nil { + return "", xerrors.Errorf("failed to upload block data: %w", err) + } + + attrs := w.Attrs() + if !bytes.Equal(checksum, attrs.MD5) { + return "", xerrors.Errorf("uploaded block md5 checksum %x is different from expected %x", attrs.MD5, checksum) + } + + // a workaround to use timer + s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond) + + return key, nil +} + +func (s *blobStorageImpl) UploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) { + return s.instrumentUploadRaw.Instrument(ctx, func(ctx context.Context) (string, error) { + defer s.logDuration("upload", time.Now()) + + // Skip the upload if the block itself is skipped. + if rawBlockData.BlockMetadata.Skipped { + return "", nil + } + + return s.uploadRaw(ctx, rawBlockData) + }) +} + func (s *blobStorageImpl) Upload(ctx context.Context, block *api.Block, compression api.Compression) (string, error) { return s.instrumentUpload.Instrument(ctx, func(ctx context.Context) (string, error) { - var key string defer s.logDuration("upload", time.Now()) // Skip the upload if the block itself is skipped. if block.Metadata.Skipped { return "", nil } - data, err := proto.Marshal(block) if err != nil { return "", xerrors.Errorf("failed to marshal block: %w", err) } - blockchainNetwork := fmt.Sprintf("%s/%s", block.Blockchain, block.Network) - tagHeightHash := fmt.Sprintf("%d/%d/%s", block.Metadata.Tag, block.Metadata.Height, block.Metadata.Hash) - if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE { - key = fmt.Sprintf( - "%s/%s/%s", blockchainNetwork, block.SideChain, tagHeightHash, - ) - } else { - key = fmt.Sprintf( - "%s/%s", blockchainNetwork, tagHeightHash, - ) - } - data, err = storage_utils.Compress(data, compression) if err != nil { return "", xerrors.Errorf("failed to compress data with type %v: %w", compression.String(), err) } - key, err = storage_utils.GetObjectKey(key, compression) - if err != nil { - return "", xerrors.Errorf("failed to get object key: %w", err) - } - - // #nosec G401 - h := md5.New() - size, err := h.Write(data) - if err != nil { - return "", xerrors.Errorf("failed to compute checksum: %w", err) - } - - checksum := h.Sum(nil) - - object := s.client.Bucket(s.bucket).Object(key) - w := object.NewWriter(ctx) - finalizer := finalizer.WithCloser(w) - defer finalizer.Finalize() - - _, err = w.Write(data) - if err != nil { - return "", xerrors.Errorf("failed to upload block data: %w", err) - } - err = finalizer.Close() - if err != nil { - return "", xerrors.Errorf("failed to upload block data: %w", err) - } - - attrs := w.Attrs() - if err != nil { - return "", xerrors.Errorf("failed to load attributes for uploaded block data: %w", err) - } - if !bytes.Equal(checksum, attrs.MD5) { - return "", xerrors.Errorf("uploaded block md5 checksum %x is different from expected %x", attrs.MD5, checksum) - } - - // a workaround to use timer - s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond) - return key, nil + return s.uploadRaw(ctx, &internal.RawBlockData{ + Blockchain: block.Blockchain, + SideChain: block.SideChain, + Network: block.Network, + BlockMetadata: block.Metadata, + BlockData: data, + BlockDataCompression: compression, + }) }) } diff --git a/internal/storage/blobstorage/internal/blobstorage.go b/internal/storage/blobstorage/internal/blobstorage.go index a2dd6fb2..c85ec7de 100644 --- a/internal/storage/blobstorage/internal/blobstorage.go +++ b/internal/storage/blobstorage/internal/blobstorage.go @@ -8,12 +8,23 @@ import ( "github.com/coinbase/chainstorage/internal/config" "github.com/coinbase/chainstorage/internal/utils/fxparams" + "github.com/coinbase/chainstorage/protos/coinbase/c3/common" api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" ) type ( + RawBlockData struct { + Blockchain common.Blockchain + SideChain api.SideChain + Network common.Network + BlockMetadata *api.BlockMetadata + BlockData []byte + BlockDataCompression api.Compression + } + BlobStorage interface { Upload(ctx context.Context, block *api.Block, compression api.Compression) (string, error) + UploadRaw(ctx context.Context, rawBlockData *RawBlockData) (string, error) Download(ctx context.Context, metadata *api.BlockMetadata) (*api.Block, error) PreSign(ctx context.Context, objectKey string) (string, error) } diff --git a/internal/storage/blobstorage/mocks/mocks.go b/internal/storage/blobstorage/mocks/mocks.go index 6a20177e..04561adc 100644 --- a/internal/storage/blobstorage/mocks/mocks.go +++ b/internal/storage/blobstorage/mocks/mocks.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + internal "github.com/coinbase/chainstorage/internal/storage/blobstorage/internal" chainstorage "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" gomock "go.uber.org/mock/gomock" ) @@ -84,3 +85,18 @@ func (mr *MockBlobStorageMockRecorder) Upload(arg0, arg1, arg2 any) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upload", reflect.TypeOf((*MockBlobStorage)(nil).Upload), arg0, arg1, arg2) } + +// UploadRaw mocks base method. +func (m *MockBlobStorage) UploadRaw(arg0 context.Context, arg1 *internal.RawBlockData) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UploadRaw", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UploadRaw indicates an expected call of UploadRaw. +func (mr *MockBlobStorageMockRecorder) UploadRaw(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadRaw", reflect.TypeOf((*MockBlobStorage)(nil).UploadRaw), arg0, arg1) +} diff --git a/internal/storage/blobstorage/module.go b/internal/storage/blobstorage/module.go index 5e0ccf7f..4b7d1ce9 100644 --- a/internal/storage/blobstorage/module.go +++ b/internal/storage/blobstorage/module.go @@ -12,6 +12,7 @@ type ( BlobStorage = internal.BlobStorage BlobStorageFactory = internal.BlobStorageFactory BlobStorageFactoryParams = internal.BlobStorageFactoryParams + RawBlockData = internal.RawBlockData ) var Module = fx.Options( diff --git a/internal/storage/blobstorage/s3/blob_storage.go b/internal/storage/blobstorage/s3/blob_storage.go index 17376f3c..c9cea0d0 100644 --- a/internal/storage/blobstorage/s3/blob_storage.go +++ b/internal/storage/blobstorage/s3/blob_storage.go @@ -27,6 +27,7 @@ import ( "github.com/coinbase/chainstorage/internal/utils/fxparams" "github.com/coinbase/chainstorage/internal/utils/instrument" "github.com/coinbase/chainstorage/internal/utils/log" + "github.com/coinbase/chainstorage/protos/coinbase/c3/common" api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" ) @@ -44,15 +45,16 @@ type ( } blobStorageImpl struct { - logger *zap.Logger - config *config.Config - bucket string - client s3.Client - downloader s3.Downloader - uploader s3.Uploader - blobStorageMetrics *blobStorageMetrics - instrumentUpload instrument.InstrumentWithResult[string] - instrumentDownload instrument.InstrumentWithResult[*api.Block] + logger *zap.Logger + config *config.Config + bucket string + client s3.Client + downloader s3.Downloader + uploader s3.Uploader + blobStorageMetrics *blobStorageMetrics + instrumentUpload instrument.InstrumentWithResult[string] + instrumentUploadRaw instrument.InstrumentWithResult[string] + instrumentDownload instrument.InstrumentWithResult[*api.Block] } blobStorageMetrics struct { @@ -86,15 +88,16 @@ func New(params BlobStorageParams) (internal.BlobStorage, error) { "storage_type": "s3", }) return &blobStorageImpl{ - logger: log.WithPackage(params.Logger), - config: params.Config, - bucket: params.Config.AWS.Bucket, - client: params.Client, - downloader: params.Downloader, - uploader: params.Uploader, - blobStorageMetrics: newBlobStorageMetrics(metrics), - instrumentUpload: instrument.NewWithResult[string](metrics, "upload"), - instrumentDownload: instrument.NewWithResult[*api.Block](metrics, "download"), + logger: log.WithPackage(params.Logger), + config: params.Config, + bucket: params.Config.AWS.Bucket, + client: params.Client, + downloader: params.Downloader, + uploader: params.Uploader, + blobStorageMetrics: newBlobStorageMetrics(metrics), + instrumentUpload: instrument.NewWithResult[string](metrics, "upload"), + instrumentUploadRaw: instrument.NewWithResult[string](metrics, "upload_raw"), + instrumentDownload: instrument.NewWithResult[*api.Block](metrics, "download"), }, nil } @@ -105,9 +108,72 @@ func newBlobStorageMetrics(scope tally.Scope) *blobStorageMetrics { } } +func (s *blobStorageImpl) getObjectKey(blockchain common.Blockchain, sidechain api.SideChain, network common.Network, metadata *api.BlockMetadata, compression api.Compression) (string, error) { + var key string + var err error + blockchainNetwork := fmt.Sprintf("%s/%s", blockchain, network) + tagHeightHash := fmt.Sprintf("%d/%d/%s", metadata.Tag, metadata.Height, metadata.Hash) + if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE { + key = fmt.Sprintf( + "%s/%s/%s", blockchainNetwork, sidechain, tagHeightHash, + ) + } else { + key = fmt.Sprintf( + "%s/%s", blockchainNetwork, tagHeightHash, + ) + } + key, err = storage_utils.GetObjectKey(key, compression) + if err != nil { + return "", xerrors.Errorf("failed to get object key: %w", err) + } + return key, nil +} + +func (s *blobStorageImpl) uploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) { + key, err := s.getObjectKey(rawBlockData.Blockchain, rawBlockData.SideChain, rawBlockData.Network, rawBlockData.BlockMetadata, rawBlockData.BlockDataCompression) + if err != nil { + return "", err + } + + // #nosec G401 + h := md5.New() + size, err := h.Write(rawBlockData.BlockData) + if err != nil { + return "", xerrors.Errorf("failed to compute checksum: %w", err) + } + + checksum := base64.StdEncoding.EncodeToString(h.Sum(nil)) + + if _, err := s.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: bytes.NewReader(rawBlockData.BlockData), + ContentMD5: aws.String(checksum), + ACL: aws.String(bucketOwnerFullControl), + }); err != nil { + return "", xerrors.Errorf("failed to upload to s3: %w", err) + } + + // a workaround to use timer + s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond) + + return key, nil +} + +func (s *blobStorageImpl) UploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) { + return s.instrumentUploadRaw.Instrument(ctx, func(ctx context.Context) (string, error) { + defer s.logDuration("upload", time.Now()) + + // Skip the upload if the block itself is skipped. + if rawBlockData.BlockMetadata.Skipped { + return "", nil + } + return s.uploadRaw(ctx, rawBlockData) + }) +} + func (s *blobStorageImpl) Upload(ctx context.Context, block *api.Block, compression api.Compression) (string, error) { return s.instrumentUpload.Instrument(ctx, func(ctx context.Context) (string, error) { - var key string defer s.logDuration("upload", time.Now()) // Skip the upload if the block itself is skipped. @@ -120,50 +186,18 @@ func (s *blobStorageImpl) Upload(ctx context.Context, block *api.Block, compress return "", xerrors.Errorf("failed to marshal block: %w", err) } - blockchainNetwork := fmt.Sprintf("%s/%s", block.Blockchain, block.Network) - tagHeightHash := fmt.Sprintf("%d/%d/%s", block.Metadata.Tag, block.Metadata.Height, block.Metadata.Hash) - if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE { - key = fmt.Sprintf( - "%s/%s/%s", blockchainNetwork, block.SideChain, tagHeightHash, - ) - } else { - key = fmt.Sprintf( - "%s/%s", blockchainNetwork, tagHeightHash, - ) - } - data, err = storage_utils.Compress(data, compression) if err != nil { return "", xerrors.Errorf("failed to compress data with type %v: %w", compression.String(), err) } - key, err = storage_utils.GetObjectKey(key, compression) - if err != nil { - return "", xerrors.Errorf("failed to get object key: %w", err) - } - - // #nosec G401 - h := md5.New() - size, err := h.Write(data) - if err != nil { - return "", xerrors.Errorf("failed to compute checksum: %w", err) - } - - checksum := base64.StdEncoding.EncodeToString(h.Sum(nil)) - - if _, err := s.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(key), - Body: bytes.NewReader(data), - ContentMD5: aws.String(checksum), - ACL: aws.String(bucketOwnerFullControl), - }); err != nil { - return "", xerrors.Errorf("failed to upload to s3: %w", err) - } - - // a workaround to use timer - s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond) - - return key, nil + return s.uploadRaw(ctx, &internal.RawBlockData{ + Blockchain: block.Blockchain, + SideChain: block.SideChain, + Network: block.Network, + BlockMetadata: block.Metadata, + BlockData: data, + BlockDataCompression: compression, + }) }) } diff --git a/internal/workflow/activity/replicator.go b/internal/workflow/activity/replicator.go index 37613c80..aea3aa69 100644 --- a/internal/workflow/activity/replicator.go +++ b/internal/workflow/activity/replicator.go @@ -2,29 +2,50 @@ package activity import ( "context" + "io" + "net/http" + "time" "go.temporal.io/sdk/workflow" "go.uber.org/fx" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" + "google.golang.org/protobuf/proto" + tracehttp "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http" "github.com/coinbase/chainstorage/internal/cadence" + "github.com/coinbase/chainstorage/internal/config" "github.com/coinbase/chainstorage/internal/gateway" "github.com/coinbase/chainstorage/internal/storage/blobstorage" "github.com/coinbase/chainstorage/internal/storage/blobstorage/downloader" "github.com/coinbase/chainstorage/internal/storage/metastorage" + storage_utils "github.com/coinbase/chainstorage/internal/storage/utils" + "github.com/coinbase/chainstorage/internal/utils/finalizer" "github.com/coinbase/chainstorage/internal/utils/fxparams" + "github.com/coinbase/chainstorage/internal/utils/retry" api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" ) +const ( + timeout = time.Second * 30 +) + +var ( + ErrDownloadFailure = xerrors.New("download failure") +) + type ( Replicator struct { baseActivity + config *config.Config + logger *zap.Logger client gateway.Client blockDownloader downloader.BlockDownloader metaStorage metastorage.MetaStorage blobStorage blobstorage.BlobStorage + retry retry.RetryWithResult[[]byte] + httpClient *http.Client } ReplicatorParams struct { @@ -52,12 +73,22 @@ type ( ) func NewReplicator(params ReplicatorParams) *Replicator { + httpClient := &http.Client{ + Timeout: timeout, + } + httpClient = tracehttp.WrapClient(httpClient, tracehttp.RTWithResourceNamer(func(req *http.Request) string { + return "/workflow/activity/replicator" + })) a := &Replicator{ baseActivity: newBaseActivity(ActivityReplicator, params.Runtime), + config: params.Config, + logger: params.Logger, client: params.Client, blockDownloader: params.BlockDownloader, metaStorage: params.MetaStorage, blobStorage: params.BlobStorage, + httpClient: httpClient, + retry: retry.NewWithResult[[]byte](retry.WithLogger(params.Logger)), } a.register(a.execute) return a @@ -69,6 +100,111 @@ func (a *Replicator) Execute(ctx workflow.Context, request *ReplicatorRequest) ( return &response, err } +func (a *Replicator) downloadBlockData(ctx context.Context, url string) ([]byte, error) { + return a.retry.Retry(ctx, func(ctx context.Context) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, xerrors.Errorf("failed to create download request: %w", err) + } + + httpResp, err := a.httpClient.Do(req) + if err != nil { + return nil, retry.Retryable(xerrors.Errorf("failed to download block file: %w", err)) + } + + finalizer := finalizer.WithCloser(httpResp.Body) + defer finalizer.Finalize() + + if statusCode := httpResp.StatusCode; statusCode != http.StatusOK { + if statusCode == http.StatusRequestTimeout || + statusCode == http.StatusTooManyRequests || + statusCode >= http.StatusInternalServerError { + return nil, retry.Retryable(xerrors.Errorf("received %d status code: %w", statusCode, ErrDownloadFailure)) + } else { + return nil, xerrors.Errorf("received non-retryable %d status code: %w", statusCode, ErrDownloadFailure) + } + } + + bodyBytes, err := io.ReadAll(httpResp.Body) + if err != nil { + return nil, retry.Retryable(xerrors.Errorf("failed to read body: %w", err)) + } + return bodyBytes, finalizer.Close() + }) +} +func (a *Replicator) prepareRawBlockData(ctx context.Context, blockFile *api.BlockFile, compression api.Compression) (*blobstorage.RawBlockData, error) { + bodyBytes, err := a.downloadBlockData(ctx, blockFile.FileUrl) + if err != nil { + return nil, err + } + var rawBytes []byte + var compressedBytes []byte + switch blockFile.Compression { + case api.Compression_NONE: + rawBytes = bodyBytes + if compression == api.Compression_GZIP { + compressedBytes, err = storage_utils.Compress(rawBytes, compression) + if err != nil { + return nil, xerrors.Errorf("failed to compress block data with type %v: %w", compression.String(), err) + } + } + case api.Compression_GZIP: + compressedBytes = bodyBytes + if compression == api.Compression_NONE { + rawBytes, err = storage_utils.Decompress(rawBytes, blockFile.Compression) + if err != nil { + return nil, xerrors.Errorf("failed to decompress block data with type %v: %w", blockFile.Compression.String(), err) + } + } + default: + return nil, xerrors.Errorf("unknown block file compression type %v", blockFile.Compression.String()) + } + metadata := &api.BlockMetadata{ + Tag: blockFile.Tag, + Hash: blockFile.Hash, + ParentHash: blockFile.ParentHash, + Height: blockFile.Height, + ParentHeight: blockFile.ParentHeight, + Skipped: blockFile.Skipped, + Timestamp: blockFile.BlockTimestamp, + } + // if block file is coming from old chainstorage api, the block timestamp is not set + // we need to extract it from the block data + // TODO remove this after the api upgrade + if metadata.Timestamp == nil || (metadata.Timestamp.Nanos == 0 && metadata.Timestamp.Seconds == 0) { + block := new(api.Block) + rawBytes := rawBytes + if len(rawBytes) == 0 { + rawBytes, err = storage_utils.Decompress(bodyBytes, blockFile.Compression) + if err != nil { + return nil, xerrors.Errorf("failed to decompress block data with type %v: %w", blockFile.Compression.String(), err) + } + } + + if err := proto.Unmarshal(rawBytes, block); err != nil { + return nil, xerrors.Errorf("failed to unmarshal file contents: %w", err) + } + blockFile.BlockTimestamp = block.Metadata.Timestamp + } + rawBlockData := &blobstorage.RawBlockData{ + Blockchain: a.config.Chain.Blockchain, + Network: a.config.Chain.Network, + SideChain: a.config.Chain.Sidechain, + BlockMetadata: metadata, + BlockDataCompression: compression, + } + switch compression { + case api.Compression_NONE: + rawBlockData.BlockData = rawBytes + return rawBlockData, nil + case api.Compression_GZIP: + rawBlockData.BlockData = compressedBytes + return rawBlockData, nil + default: + return nil, xerrors.Errorf("unknown compression type %v", compression.String()) + } +} + func (a *Replicator) execute(ctx context.Context, request *ReplicatorRequest) (*ReplicatorResponse, error) { if err := a.validateRequest(request); err != nil { return nil, err @@ -99,12 +235,16 @@ func (a *Replicator) execute(ctx context.Context, request *ReplicatorRequest) (* zap.Uint64("height", blockFile.Height), zap.String("hash", blockFile.Hash), ) - block, err := a.blockDownloader.Download(errgroupCtx, blockFile) + rawBlockData, err := a.prepareRawBlockData(errgroupCtx, blockFile, request.Compression) + if err != nil { + return xerrors.Errorf("failed to prepare raw block data: %w", err) + } + objectKeyMain, err := a.blobStorage.UploadRaw(errgroupCtx, rawBlockData) if err != nil { - return xerrors.Errorf("failed download block file from %s: %w", blockFile.GetFileUrl(), err) + return xerrors.Errorf("failed to upload raw block file: %w", err) } - _, err = a.blobStorage.Upload(errgroupCtx, block, request.Compression) - blockMetas[i] = block.Metadata + blockMetas[i] = rawBlockData.BlockMetadata + blockMetas[i].ObjectKeyMain = objectKeyMain return err }) }