Skip to content

Commit 6c0ab51

Browse files
thomascubetekton-robot
authored andcommitted
Fix S3 multipart upload if logs are bigger than S3_MULTI_PART_SIZE
Also skip s3.UploadPart call if part is empty Fixes #719
1 parent f7e8f2f commit 6c0ab51

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

pkg/api/server/v1alpha2/log/s3.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (s3s *s3Stream) ReadFrom(r io.Reader) (int64, error) {
161161

162162
size := s3s.partSize + n
163163
if size >= s3s.multiPartSize {
164-
err = s3s.uploadMultiPart(&s3s.buffer, s3s.partNumber, n)
164+
err = s3s.uploadMultiPart(&s3s.buffer, s3s.partNumber, size)
165165
if err != nil {
166166
return 0, err
167167
}
@@ -175,6 +175,10 @@ func (s3s *s3Stream) ReadFrom(r io.Reader) (int64, error) {
175175
}
176176

177177
func (s3s *s3Stream) uploadMultiPart(reader io.Reader, partNumber int32, partSize int64) error {
178+
if partSize == 0 {
179+
return nil
180+
}
181+
178182
part, err := s3s.client.UploadPart(s3s.ctx, &s3.UploadPartInput{
179183
UploadId: &s3s.uploadID,
180184
Bucket: &s3s.bucket,

pkg/api/server/v1alpha2/log/s3_test.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,16 @@ func (m *mockS3Client) GetObject(ctx context.Context, params *s3.GetObjectInput,
5050

5151
func (m *mockS3Client) UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) { //nolint:revive
5252
buffer := bytes.Buffer{}
53-
_, err := buffer.ReadFrom(params.Body)
53+
bufLen, err := buffer.ReadFrom(params.Body)
5454
if err != nil {
5555
m.t.Errorf("error uploading part: %d", params.PartNumber)
5656
}
57+
if *params.ContentLength != bufLen {
58+
m.t.Errorf("ContentLength doesn't match buffer length: got %d, expected %d", *params.ContentLength, bufLen)
59+
}
60+
if *params.ContentLength == 0 {
61+
m.t.Errorf("ContentLength must be > 0: got %d", *params.ContentLength)
62+
}
5763
m.body = append(m.body, buffer.Bytes()...)
5864
e := strconv.Itoa(int(m.partNumber))
5965
return &s3.UploadPartOutput{ETag: &e}, nil
@@ -104,19 +110,20 @@ func TestS3Stream_WriteTo(t *testing.T) {
104110
}
105111

106112
func TestS3Stream_ReadFrom(t *testing.T) {
107-
want := "test body of multi-part upload"
113+
want := "test body of multi-part upload 40 bytes"
108114
const DefaultBufferSize = 10
109115
c := &server.Config{
110116
S3_BUCKET_NAME: "test-bucket",
111117
}
112118
filePath := "test"
113119
s := &s3Stream{
114-
config: c,
115-
bucket: c.S3_BUCKET_NAME,
116-
key: filePath,
117-
size: c.LOGS_BUFFER_SIZE,
118-
buffer: bytes.Buffer{},
119-
partNumber: 1,
120+
config: c,
121+
bucket: c.S3_BUCKET_NAME,
122+
key: filePath,
123+
size: c.LOGS_BUFFER_SIZE,
124+
multiPartSize: 20,
125+
buffer: bytes.Buffer{},
126+
partNumber: 1,
120127
client: &mockS3Client{
121128
t: t,
122129
bucket: c.S3_BUCKET_NAME,

0 commit comments

Comments
 (0)