s3api: fix multipart Complete ETag matching and lower empty-upload log noise (#8264)
s3api: fix multipart part etag validation and reduce empty upload warning noise
This commit is contained in:
@@ -113,7 +113,9 @@ uploadLoop:
|
|||||||
// Only break if we've already read some data (chunkOffset > 0) or if this is truly EOF
|
// Only break if we've already read some data (chunkOffset > 0) or if this is truly EOF
|
||||||
if dataSize == 0 {
|
if dataSize == 0 {
|
||||||
if chunkOffset == 0 {
|
if chunkOffset == 0 {
|
||||||
glog.Warningf("UploadReaderInChunks: received 0 bytes on first read - creating empty file")
|
// Empty objects are valid for S3/HTTP uploads (e.g. zero-byte files).
|
||||||
|
// Keep this at verbose level to avoid warning noise in normal operation.
|
||||||
|
glog.V(4).Infof("UploadReaderInChunks: received 0 bytes on first read - creating empty file")
|
||||||
}
|
}
|
||||||
chunkBufferPool.Put(bytesBuffer)
|
chunkBufferPool.Put(bytesBuffer)
|
||||||
<-bytesBufferLimitChan
|
<-bytesBufferLimitChan
|
||||||
|
|||||||
@@ -243,17 +243,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, partETag := range completedPartsByNumber {
|
for _, partETag := range completedPartsByNumber {
|
||||||
partETag = strings.Trim(partETag, `"`)
|
match, invalid, normalizedPartETag, normalizedEntryETag := validateCompletePartETag(partETag, entry)
|
||||||
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
|
if invalid {
|
||||||
if partETag != "" && len(partETag) == 32 && entryETag != "" {
|
glog.Warningf("invalid complete etag %s, storedEtag %s", normalizedPartETag, normalizedEntryETag)
|
||||||
if entryETag != partETag {
|
|
||||||
glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
|
|
||||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag)
|
|
||||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
|
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !match {
|
||||||
|
glog.Errorf("completeMultipartUpload %s ETag mismatch stored: %s part: %s", entry.Name, normalizedEntryETag, normalizedPartETag)
|
||||||
|
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if len(entry.Chunks) == 0 && partNumber != maxPartNo {
|
if len(entry.Chunks) == 0 && partNumber != maxPartNo {
|
||||||
glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
|
glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
|
||||||
@@ -1001,3 +1000,17 @@ func getEtagFromEntry(entry *filer_pb.Entry) string {
|
|||||||
glog.V(4).Infof("getEtagFromEntry: fallback to filer.ETag for %s: %s, chunkCount: %d", entryName, etag, len(entry.Chunks))
|
glog.V(4).Infof("getEtagFromEntry: fallback to filer.ETag for %s: %s, chunkCount: %d", entryName, etag, len(entry.Chunks))
|
||||||
return "\"" + etag + "\""
|
return "\"" + etag + "\""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateCompletePartETag(partETag string, entry *filer_pb.Entry) (match bool, invalid bool, normalizedPartETag string, normalizedEntryETag string) {
|
||||||
|
normalizedPartETag = strings.Trim(strings.TrimSpace(partETag), `"`)
|
||||||
|
if normalizedPartETag == "" {
|
||||||
|
return false, true, normalizedPartETag, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
normalizedEntryETag = strings.Trim(getEtagFromEntry(entry), `"`)
|
||||||
|
if normalizedEntryETag == "" {
|
||||||
|
return false, true, normalizedPartETag, normalizedEntryETag
|
||||||
|
}
|
||||||
|
|
||||||
|
return normalizedPartETag == normalizedEntryETag, false, normalizedPartETag, normalizedEntryETag
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
package s3api
|
package s3api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/hex"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
@@ -132,3 +135,58 @@ func TestGetEntryNameAndDir(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestValidateCompletePartETag(t *testing.T) {
|
||||||
|
t.Run("matches_composite_etag_from_extended", func(t *testing.T) {
|
||||||
|
entry := &filer_pb.Entry{
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtETagKey: []byte("ea58527f14c6ae0dd53089966e44941b-2"),
|
||||||
|
},
|
||||||
|
Attributes: &filer_pb.FuseAttributes{},
|
||||||
|
}
|
||||||
|
match, invalid, part, stored := validateCompletePartETag(`"ea58527f14c6ae0dd53089966e44941b-2"`, entry)
|
||||||
|
assert.True(t, match)
|
||||||
|
assert.False(t, invalid)
|
||||||
|
assert.Equal(t, "ea58527f14c6ae0dd53089966e44941b-2", part)
|
||||||
|
assert.Equal(t, "ea58527f14c6ae0dd53089966e44941b-2", stored)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("matches_md5_from_attributes", func(t *testing.T) {
|
||||||
|
md5Bytes, err := hex.DecodeString("324b2665939fde5b8678d3a8b5c46970")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
entry := &filer_pb.Entry{
|
||||||
|
Attributes: &filer_pb.FuseAttributes{
|
||||||
|
Md5: md5Bytes,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
match, invalid, part, stored := validateCompletePartETag("324b2665939fde5b8678d3a8b5c46970", entry)
|
||||||
|
assert.True(t, match)
|
||||||
|
assert.False(t, invalid)
|
||||||
|
assert.Equal(t, "324b2665939fde5b8678d3a8b5c46970", part)
|
||||||
|
assert.Equal(t, "324b2665939fde5b8678d3a8b5c46970", stored)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("detects_mismatch", func(t *testing.T) {
|
||||||
|
entry := &filer_pb.Entry{
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtETagKey: []byte("67fdd2e302502ff9f9b606bc036e6892-2"),
|
||||||
|
},
|
||||||
|
Attributes: &filer_pb.FuseAttributes{},
|
||||||
|
}
|
||||||
|
match, invalid, _, _ := validateCompletePartETag("686f7d71bacdcd539dd4e17a0d7f1e5f-2", entry)
|
||||||
|
assert.False(t, match)
|
||||||
|
assert.False(t, invalid)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("flags_empty_client_etag_as_invalid", func(t *testing.T) {
|
||||||
|
entry := &filer_pb.Entry{
|
||||||
|
Extended: map[string][]byte{
|
||||||
|
s3_constants.ExtETagKey: []byte("67fdd2e302502ff9f9b606bc036e6892-2"),
|
||||||
|
},
|
||||||
|
Attributes: &filer_pb.FuseAttributes{},
|
||||||
|
}
|
||||||
|
match, invalid, _, _ := validateCompletePartETag(`""`, entry)
|
||||||
|
assert.False(t, match)
|
||||||
|
assert.True(t, invalid)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user