S3: add object versioning (#6945)
* add object versioning * add missing file * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * ListObjectVersionsResult is better to show multiple version entries * fix test * Update weed/s3api/s3api_object_handlers_put.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * multiple improvements * move PutBucketVersioningHandler into weed/s3api/s3api_bucket_handlers.go file * duplicated code for reading bucket config, versioningEnabled, etc. try to use functions * opportunity to cache bucket config * error handling if bucket is not found * in case bucket is not found * fix build * add object versioning tests * remove non-existent tests * add tests * add versioning tests * skip a new test * ensure .versions directory exists before saving info into it * fix creating version entry * logging on creating version directory * Update s3api_object_versioning_test.go * retry and wait for directory creation * revert add more logging * Update s3api_object_versioning.go * more debug messages * clean up logs, and touch directory correctly * log the .versions creation and then parent directory listing * use mkFile instead of touch touch is for update * clean up data * add versioning test in go * change location * if modified, latest version is moved to .versions directory, and create a new latest version Core versioning functionality: WORKING TestVersioningBasicWorkflow - PASS TestVersioningDeleteMarkers - PASS TestVersioningMultipleVersionsSameObject - PASS TestVersioningDeleteAndRecreate - PASS TestVersioningListWithPagination - PASS ❌ Some advanced features still failing: ETag calculation issues (using mtime instead of proper MD5) Specific version retrieval (EOF error) Version deletion (internal errors) Concurrent operations (race conditions) * calculate multi chunk md5 Test Results - All Passing: ✅ TestBucketListReturnDataVersioning - PASS ✅ TestVersioningCreateObjectsInOrder - PASS ✅ TestVersioningBasicWorkflow - PASS ✅ TestVersioningMultipleVersionsSameObject - PASS ✅ TestVersioningDeleteMarkers - PASS * dedupe * fix TestVersioningErrorCases * fix eof error of reading old versions * get specific version also check current version * enable integration tests for versioning * trigger action to work for now * Fix GitHub Actions S3 versioning tests workflow - Fix syntax error (incorrect indentation) - Update directory paths from weed/s3api/versioning_tests/ to test/s3/versioning/ - Add push trigger for add-object-versioning branch to enable CI during development - Update artifact paths to match correct directory structure * Improve CI robustness for S3 versioning tests Makefile improvements: - Increase server startup timeout from 30s to 90s for CI environments - Add progressive timeout reporting (logs at 30s, full logs at 90s) - Better error handling with server logs on failure - Add server PID tracking for debugging - Improved test failure reporting GitHub Actions workflow improvements: - Increase job timeouts to account for CI environment delays - Add system information logging (memory, disk space) - Add detailed failure reporting with server logs - Add process and network diagnostics on failure - Better error messaging and log collection These changes should resolve the 'Server failed to start within 30 seconds' issue that was causing the CI tests to fail. * adjust testing volume size * Update Makefile * Update Makefile * Update Makefile * Update Makefile * Update s3-versioning-tests.yml * Update s3api_object_versioning.go * Update Makefile * do not clean up * log received version id * more logs * printout response * print out list version response * use tmp files when put versioned object * change to versions folder layout * Delete weed-test.log * test with mixed versioned and unversioned objects * remove versionDirCache * remove unused functions * remove unused function * remove fallback checking * minor --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -3,10 +3,11 @@ package s3api
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
|
||||
|
||||
@@ -1,7 +1,14 @@
|
||||
package s3_constants
|
||||
|
||||
const (
|
||||
ExtAmzOwnerKey = "Seaweed-X-Amz-Owner"
|
||||
ExtAmzAclKey = "Seaweed-X-Amz-Acl"
|
||||
ExtOwnershipKey = "Seaweed-X-Amz-Ownership"
|
||||
ExtAmzOwnerKey = "Seaweed-X-Amz-Owner"
|
||||
ExtAmzAclKey = "Seaweed-X-Amz-Acl"
|
||||
ExtOwnershipKey = "Seaweed-X-Amz-Ownership"
|
||||
ExtVersioningKey = "Seaweed-X-Amz-Versioning"
|
||||
ExtVersionIdKey = "Seaweed-X-Amz-Version-Id"
|
||||
ExtDeleteMarkerKey = "Seaweed-X-Amz-Delete-Marker"
|
||||
ExtIsLatestKey = "Seaweed-X-Amz-Is-Latest"
|
||||
ExtETagKey = "Seaweed-X-Amz-ETag"
|
||||
ExtLatestVersionIdKey = "Seaweed-X-Amz-Latest-Version-Id"
|
||||
ExtLatestVersionFileNameKey = "Seaweed-X-Amz-Latest-Version-File-Name"
|
||||
)
|
||||
|
||||
246
weed/s3api/s3api_bucket_config.go
Normal file
246
weed/s3api/s3api_bucket_config.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
)
|
||||
|
||||
// BucketConfig represents cached bucket configuration
|
||||
type BucketConfig struct {
|
||||
Name string
|
||||
Versioning string // "Enabled", "Suspended", or ""
|
||||
Ownership string
|
||||
ACL []byte
|
||||
Owner string
|
||||
LastModified time.Time
|
||||
Entry *filer_pb.Entry
|
||||
}
|
||||
|
||||
// BucketConfigCache provides caching for bucket configurations
|
||||
type BucketConfigCache struct {
|
||||
cache map[string]*BucketConfig
|
||||
mutex sync.RWMutex
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
// NewBucketConfigCache creates a new bucket configuration cache
|
||||
func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache {
|
||||
return &BucketConfigCache{
|
||||
cache: make(map[string]*BucketConfig),
|
||||
ttl: ttl,
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves bucket configuration from cache
|
||||
func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) {
|
||||
bcc.mutex.RLock()
|
||||
defer bcc.mutex.RUnlock()
|
||||
|
||||
config, exists := bcc.cache[bucket]
|
||||
if !exists {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Check if cache entry is expired
|
||||
if time.Since(config.LastModified) > bcc.ttl {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return config, true
|
||||
}
|
||||
|
||||
// Set stores bucket configuration in cache
|
||||
func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) {
|
||||
bcc.mutex.Lock()
|
||||
defer bcc.mutex.Unlock()
|
||||
|
||||
config.LastModified = time.Now()
|
||||
bcc.cache[bucket] = config
|
||||
}
|
||||
|
||||
// Remove removes bucket configuration from cache
|
||||
func (bcc *BucketConfigCache) Remove(bucket string) {
|
||||
bcc.mutex.Lock()
|
||||
defer bcc.mutex.Unlock()
|
||||
|
||||
delete(bcc.cache, bucket)
|
||||
}
|
||||
|
||||
// Clear clears all cached configurations
|
||||
func (bcc *BucketConfigCache) Clear() {
|
||||
bcc.mutex.Lock()
|
||||
defer bcc.mutex.Unlock()
|
||||
|
||||
bcc.cache = make(map[string]*BucketConfig)
|
||||
}
|
||||
|
||||
// getBucketConfig retrieves bucket configuration with caching
|
||||
func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) {
|
||||
// Try cache first
|
||||
if config, found := s3a.bucketConfigCache.Get(bucket); found {
|
||||
return config, s3err.ErrNone
|
||||
}
|
||||
|
||||
// Load from filer
|
||||
bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return nil, s3err.ErrNoSuchBucket
|
||||
}
|
||||
glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
|
||||
return nil, s3err.ErrInternalError
|
||||
}
|
||||
|
||||
config := &BucketConfig{
|
||||
Name: bucket,
|
||||
Entry: bucketEntry,
|
||||
}
|
||||
|
||||
// Extract configuration from extended attributes
|
||||
if bucketEntry.Extended != nil {
|
||||
if versioning, exists := bucketEntry.Extended[s3_constants.ExtVersioningKey]; exists {
|
||||
config.Versioning = string(versioning)
|
||||
}
|
||||
if ownership, exists := bucketEntry.Extended[s3_constants.ExtOwnershipKey]; exists {
|
||||
config.Ownership = string(ownership)
|
||||
}
|
||||
if acl, exists := bucketEntry.Extended[s3_constants.ExtAmzAclKey]; exists {
|
||||
config.ACL = acl
|
||||
}
|
||||
if owner, exists := bucketEntry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
||||
config.Owner = string(owner)
|
||||
}
|
||||
}
|
||||
|
||||
// Cache the result
|
||||
s3a.bucketConfigCache.Set(bucket, config)
|
||||
|
||||
return config, s3err.ErrNone
|
||||
}
|
||||
|
||||
// updateBucketConfig updates bucket configuration and invalidates cache
|
||||
func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode {
|
||||
config, errCode := s3a.getBucketConfig(bucket)
|
||||
if errCode != s3err.ErrNone {
|
||||
return errCode
|
||||
}
|
||||
|
||||
// Apply update function
|
||||
if err := updateFn(config); err != nil {
|
||||
glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Prepare extended attributes
|
||||
if config.Entry.Extended == nil {
|
||||
config.Entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
|
||||
// Update extended attributes
|
||||
if config.Versioning != "" {
|
||||
config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning)
|
||||
}
|
||||
if config.Ownership != "" {
|
||||
config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership)
|
||||
}
|
||||
if config.ACL != nil {
|
||||
config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL
|
||||
}
|
||||
if config.Owner != "" {
|
||||
config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
|
||||
}
|
||||
|
||||
// Save to filer
|
||||
err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)
|
||||
if err != nil {
|
||||
glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Update cache
|
||||
s3a.bucketConfigCache.Set(bucket, config)
|
||||
|
||||
return s3err.ErrNone
|
||||
}
|
||||
|
||||
// isVersioningEnabled checks if versioning is enabled for a bucket (with caching)
|
||||
func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
|
||||
config, errCode := s3a.getBucketConfig(bucket)
|
||||
if errCode != s3err.ErrNone {
|
||||
if errCode == s3err.ErrNoSuchBucket {
|
||||
return false, filer_pb.ErrNotFound
|
||||
}
|
||||
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
|
||||
}
|
||||
|
||||
return config.Versioning == "Enabled", nil
|
||||
}
|
||||
|
||||
// getBucketVersioningStatus returns the versioning status for a bucket
|
||||
func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
|
||||
config, errCode := s3a.getBucketConfig(bucket)
|
||||
if errCode != s3err.ErrNone {
|
||||
return "", errCode
|
||||
}
|
||||
|
||||
if config.Versioning == "" {
|
||||
return "Suspended", s3err.ErrNone
|
||||
}
|
||||
|
||||
return config.Versioning, s3err.ErrNone
|
||||
}
|
||||
|
||||
// setBucketVersioningStatus sets the versioning status for a bucket
|
||||
func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode {
|
||||
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
||||
config.Versioning = status
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// getBucketOwnership returns the ownership setting for a bucket
|
||||
func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) {
|
||||
config, errCode := s3a.getBucketConfig(bucket)
|
||||
if errCode != s3err.ErrNone {
|
||||
return "", errCode
|
||||
}
|
||||
|
||||
return config.Ownership, s3err.ErrNone
|
||||
}
|
||||
|
||||
// setBucketOwnership sets the ownership setting for a bucket
|
||||
func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode {
|
||||
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
||||
config.Ownership = ownership
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// removeBucketConfigKey removes a specific configuration key from bucket
|
||||
func (s3a *S3ApiServer) removeBucketConfigKey(bucket, key string) s3err.ErrorCode {
|
||||
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
||||
if config.Entry.Extended != nil {
|
||||
delete(config.Entry.Extended, key)
|
||||
}
|
||||
|
||||
// Update our local config too
|
||||
switch key {
|
||||
case s3_constants.ExtVersioningKey:
|
||||
config.Versioning = ""
|
||||
case s3_constants.ExtOwnershipKey:
|
||||
config.Ownership = ""
|
||||
case s3_constants.ExtAmzAclKey:
|
||||
config.ACL = nil
|
||||
case s3_constants.ExtAmzOwnerKey:
|
||||
config.Owner = ""
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@@ -552,25 +552,17 @@ func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
|
||||
bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
// Check if ownership needs to be updated
|
||||
currentOwnership, errCode := s3a.getBucketOwnership(bucket)
|
||||
if errCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
return
|
||||
}
|
||||
|
||||
oldOwnership, ok := bucketEntry.Extended[s3_constants.ExtOwnershipKey]
|
||||
if !ok || string(oldOwnership) != ownership {
|
||||
if bucketEntry.Extended == nil {
|
||||
bucketEntry.Extended = make(map[string][]byte)
|
||||
}
|
||||
bucketEntry.Extended[s3_constants.ExtOwnershipKey] = []byte(ownership)
|
||||
err = s3a.updateEntry(s3a.option.BucketsPath, bucketEntry)
|
||||
if err != nil {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
if currentOwnership != ownership {
|
||||
errCode = s3a.setBucketOwnership(bucket, ownership)
|
||||
if errCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -596,22 +588,15 @@ func (s3a *S3ApiServer) GetBucketOwnershipControls(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
|
||||
bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
// Get ownership using new bucket config system
|
||||
ownership, errCode := s3a.getBucketOwnership(bucket)
|
||||
if errCode == s3err.ErrNoSuchBucket {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
|
||||
v, ok := bucketEntry.Extended[s3_constants.ExtOwnershipKey]
|
||||
if !ok {
|
||||
} else if errCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, s3err.OwnershipControlsNotFoundError)
|
||||
return
|
||||
}
|
||||
ownership := string(v)
|
||||
|
||||
result := &s3.PutBucketOwnershipControlsInput{
|
||||
OwnershipControls: &s3.OwnershipControls{
|
||||
@@ -677,9 +662,63 @@ func (s3a *S3ApiServer) GetBucketVersioningHandler(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
|
||||
// Get versioning status using new bucket config system
|
||||
versioningStatus, errCode := s3a.getBucketVersioningStatus(bucket)
|
||||
if errCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
return
|
||||
}
|
||||
|
||||
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, &s3.PutBucketVersioningInput{
|
||||
VersioningConfiguration: &s3.VersioningConfiguration{
|
||||
Status: aws.String(s3.BucketVersioningStatusSuspended),
|
||||
Status: aws.String(versioningStatus),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// PutBucketVersioningHandler Put bucket Versioning
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
|
||||
func (s3a *S3ApiServer) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
bucket, _ := s3_constants.GetBucketAndObject(r)
|
||||
glog.V(3).Infof("PutBucketVersioning %s", bucket)
|
||||
|
||||
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
if r.Body == nil || r.Body == http.NoBody {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var versioningConfig s3.VersioningConfiguration
|
||||
defer util_http.CloseRequest(r)
|
||||
|
||||
err := xmlutil.UnmarshalXML(&versioningConfig, xml.NewDecoder(r.Body), "")
|
||||
if err != nil {
|
||||
glog.Warningf("PutBucketVersioningHandler xml decode: %s", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
|
||||
return
|
||||
}
|
||||
|
||||
if versioningConfig.Status == nil {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
||||
return
|
||||
}
|
||||
|
||||
status := *versioningConfig.Status
|
||||
if status != "Enabled" && status != "Suspended" {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Update bucket versioning configuration using new bucket config system
|
||||
if errCode := s3a.setBucketVersioningStatus(bucket, status); errCode != s3err.ErrNone {
|
||||
glog.Errorf("PutBucketVersioningHandler save config: %d", errCode)
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseEmpty(w, r)
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"net/http"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
)
|
||||
|
||||
@@ -44,12 +44,6 @@ func (s3a *S3ApiServer) DeleteBucketPolicyHandler(w http.ResponseWriter, r *http
|
||||
s3err.WriteErrorResponse(w, r, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// PutBucketVersioningHandler Put bucket Versionin
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
|
||||
func (s3a *S3ApiServer) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
|
||||
}
|
||||
|
||||
// GetBucketTaggingHandler Returns the tag set associated with the bucket
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketTagging.html
|
||||
func (s3a *S3ApiServer) GetBucketTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@@ -3,14 +3,15 @@ package s3api
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
@@ -120,7 +121,73 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
destUrl := s3a.toFilerUrl(bucket, object)
|
||||
// Check for specific version ID in query parameters
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
// Check if versioning is enabled for the bucket
|
||||
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
var destUrl string
|
||||
|
||||
if versioningEnabled {
|
||||
// Handle versioned GET - all versions are stored in .versions directory
|
||||
var targetVersionId string
|
||||
var entry *filer_pb.Entry
|
||||
|
||||
if versionId != "" {
|
||||
// Request for specific version
|
||||
glog.V(2).Infof("GetObject: requesting specific version %s for %s/%s", versionId, bucket, object)
|
||||
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
targetVersionId = versionId
|
||||
} else {
|
||||
// Request for latest version
|
||||
glog.V(2).Infof("GetObject: requesting latest version for %s/%s", bucket, object)
|
||||
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get latest version: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
if entry.Extended != nil {
|
||||
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
||||
targetVersionId = string(versionIdBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this is a delete marker
|
||||
if entry.Extended != nil {
|
||||
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// All versions are stored in .versions directory
|
||||
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
|
||||
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
|
||||
glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
|
||||
|
||||
// Set version ID in response header
|
||||
w.Header().Set("x-amz-version-id", targetVersionId)
|
||||
} else {
|
||||
// Handle regular GET (non-versioned)
|
||||
destUrl = s3a.toFilerUrl(bucket, object)
|
||||
}
|
||||
|
||||
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)
|
||||
}
|
||||
@@ -130,7 +197,73 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
|
||||
|
||||
destUrl := s3a.toFilerUrl(bucket, object)
|
||||
// Check for specific version ID in query parameters
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
// Check if versioning is enabled for the bucket
|
||||
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
var destUrl string
|
||||
|
||||
if versioningEnabled {
|
||||
// Handle versioned HEAD - all versions are stored in .versions directory
|
||||
var targetVersionId string
|
||||
var entry *filer_pb.Entry
|
||||
|
||||
if versionId != "" {
|
||||
// Request for specific version
|
||||
glog.V(2).Infof("HeadObject: requesting specific version %s for %s/%s", versionId, bucket, object)
|
||||
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
targetVersionId = versionId
|
||||
} else {
|
||||
// Request for latest version
|
||||
glog.V(2).Infof("HeadObject: requesting latest version for %s/%s", bucket, object)
|
||||
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get latest version: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
if entry.Extended != nil {
|
||||
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
||||
targetVersionId = string(versionIdBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this is a delete marker
|
||||
if entry.Extended != nil {
|
||||
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// All versions are stored in .versions directory
|
||||
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
|
||||
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
|
||||
glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
|
||||
|
||||
// Set version ID in response header
|
||||
w.Header().Set("x-amz-version-id", targetVersionId)
|
||||
} else {
|
||||
// Handle regular HEAD (non-versioned)
|
||||
destUrl = s3a.toFilerUrl(bucket, object)
|
||||
}
|
||||
|
||||
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)
|
||||
}
|
||||
|
||||
@@ -29,44 +29,87 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
|
||||
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
// Check for specific version ID in query parameters
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
// Check if versioning is enabled for the bucket
|
||||
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
var auditLog *s3err.AccessLog
|
||||
|
||||
if s3err.Logger != nil {
|
||||
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
|
||||
}
|
||||
|
||||
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if auditLog != nil {
|
||||
auditLog.Key = name
|
||||
s3err.PostAccessLog(*auditLog)
|
||||
}
|
||||
|
||||
if s3a.option.AllowEmptyFolder {
|
||||
return nil
|
||||
}
|
||||
|
||||
directoriesWithDeletion := make(map[string]int)
|
||||
if strings.LastIndex(object, "/") > 0 {
|
||||
directoriesWithDeletion[dir]++
|
||||
// purge empty folders, only checking folders with deletions
|
||||
for len(directoriesWithDeletion) > 0 {
|
||||
directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
|
||||
if versioningEnabled {
|
||||
// Handle versioned delete
|
||||
if versionId != "" {
|
||||
// Delete specific version
|
||||
err := s3a.deleteSpecificObjectVersion(bucket, object, versionId)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete specific version %s: %v", versionId, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
// Set version ID in response header
|
||||
w.Header().Set("x-amz-version-id", versionId)
|
||||
} else {
|
||||
// Create delete marker (logical delete)
|
||||
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create delete marker: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
// Set delete marker version ID in response header
|
||||
w.Header().Set("x-amz-version-id", deleteMarkerVersionId)
|
||||
w.Header().Set("x-amz-delete-marker", "true")
|
||||
}
|
||||
} else {
|
||||
// Handle regular delete (non-versioned)
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
|
||||
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s3a.option.AllowEmptyFolder {
|
||||
return nil
|
||||
}
|
||||
|
||||
directoriesWithDeletion := make(map[string]int)
|
||||
if strings.LastIndex(object, "/") > 0 {
|
||||
directoriesWithDeletion[dir]++
|
||||
// purge empty folders, only checking folders with deletions
|
||||
for len(directoriesWithDeletion) > 0 {
|
||||
directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if auditLog != nil {
|
||||
auditLog.Key = strings.TrimPrefix(object, "/")
|
||||
s3err.PostAccessLog(*auditLog)
|
||||
}
|
||||
|
||||
stats_collect.RecordBucketActiveTime(bucket)
|
||||
|
||||
@@ -71,19 +71,53 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
uploadUrl := s3a.toFilerUrl(bucket, object)
|
||||
if objectContentType == "" {
|
||||
dataReader = mimeDetect(r, dataReader)
|
||||
}
|
||||
|
||||
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
// Check if versioning is enabled for the bucket
|
||||
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
setEtag(w, etag)
|
||||
glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled)
|
||||
|
||||
if versioningEnabled {
|
||||
// Handle versioned PUT
|
||||
glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
|
||||
versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
|
||||
if errCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
return
|
||||
}
|
||||
|
||||
// Set version ID in response header
|
||||
if versionId != "" {
|
||||
w.Header().Set("x-amz-version-id", versionId)
|
||||
}
|
||||
|
||||
// Set ETag in response
|
||||
setEtag(w, etag)
|
||||
} else {
|
||||
// Handle regular PUT (non-versioned)
|
||||
glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
|
||||
uploadUrl := s3a.toFilerUrl(bucket, object)
|
||||
if objectContentType == "" {
|
||||
dataReader = mimeDetect(r, dataReader)
|
||||
}
|
||||
|
||||
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
return
|
||||
}
|
||||
|
||||
setEtag(w, etag)
|
||||
}
|
||||
}
|
||||
stats_collect.RecordBucketActiveTime(bucket)
|
||||
stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
|
||||
@@ -195,3 +229,108 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string
|
||||
}
|
||||
return string(encodedJwt)
|
||||
}
|
||||
|
||||
// putVersionedObject handles PUT operations for versioned buckets using the new layout
|
||||
// where all versions (including latest) are stored in the .versions directory
|
||||
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
|
||||
// Generate version ID
|
||||
versionId = generateVersionId()
|
||||
|
||||
glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object)
|
||||
|
||||
// Create the version file name
|
||||
versionFileName := s3a.getVersionFileName(versionId)
|
||||
|
||||
// Upload directly to the versions directory
|
||||
// We need to construct the object path relative to the bucket
|
||||
versionObjectPath := object + ".versions/" + versionFileName
|
||||
versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
|
||||
|
||||
hash := md5.New()
|
||||
var body = io.TeeReader(dataReader, hash)
|
||||
if objectContentType == "" {
|
||||
body = mimeDetect(r, body)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
|
||||
|
||||
etag, errCode = s3a.putToFiler(r, versionUploadUrl, body, "", bucket)
|
||||
if errCode != s3err.ErrNone {
|
||||
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
|
||||
return "", "", errCode
|
||||
}
|
||||
|
||||
// Get the uploaded entry to add versioning metadata
|
||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||
versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath)
|
||||
if err != nil {
|
||||
glog.Errorf("putVersionedObject: failed to get version entry: %v", err)
|
||||
return "", "", s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Add versioning metadata to this version
|
||||
if versionEntry.Extended == nil {
|
||||
versionEntry.Extended = make(map[string][]byte)
|
||||
}
|
||||
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
|
||||
|
||||
// Store ETag with quotes for S3 compatibility
|
||||
if !strings.HasPrefix(etag, "\"") {
|
||||
etag = "\"" + etag + "\""
|
||||
}
|
||||
versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
|
||||
|
||||
// Update the version entry with metadata
|
||||
err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||||
updatedEntry.Extended = versionEntry.Extended
|
||||
updatedEntry.Attributes = versionEntry.Attributes
|
||||
updatedEntry.Chunks = versionEntry.Chunks
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
|
||||
return "", "", s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Update the .versions directory metadata to indicate this is the latest version
|
||||
err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName)
|
||||
if err != nil {
|
||||
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
|
||||
return "", "", s3err.ErrInternalError
|
||||
}
|
||||
|
||||
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object)
|
||||
return versionId, etag, s3err.ErrNone
|
||||
}
|
||||
|
||||
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
|
||||
func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error {
|
||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||
versionsObjectPath := object + ".versions"
|
||||
|
||||
// Get the current .versions directory entry
|
||||
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
|
||||
if err != nil {
|
||||
glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err)
|
||||
return fmt.Errorf("failed to get .versions entry: %v", err)
|
||||
}
|
||||
|
||||
// Add or update the latest version metadata
|
||||
if versionsEntry.Extended == nil {
|
||||
versionsEntry.Extended = make(map[string][]byte)
|
||||
}
|
||||
versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(versionId)
|
||||
versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(versionFileName)
|
||||
|
||||
// Update the .versions directory entry with metadata
|
||||
err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||||
updatedEntry.Extended = versionsEntry.Extended
|
||||
updatedEntry.Attributes = versionsEntry.Attributes
|
||||
updatedEntry.Chunks = versionsEntry.Chunks
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("updateLatestVersionInDirectory: failed to update .versions directory metadata: %v", err)
|
||||
return fmt.Errorf("failed to update .versions directory metadata: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
486
weed/s3api/s3api_object_versioning.go
Normal file
486
weed/s3api/s3api_object_versioning.go
Normal file
@@ -0,0 +1,486 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
)
|
||||
|
||||
// ObjectVersion represents a version of an S3 object
|
||||
type ObjectVersion struct {
|
||||
VersionId string
|
||||
IsLatest bool
|
||||
IsDeleteMarker bool
|
||||
LastModified time.Time
|
||||
ETag string
|
||||
Size int64
|
||||
Entry *filer_pb.Entry
|
||||
}
|
||||
|
||||
// ListObjectVersionsResult represents the response for ListObjectVersions
|
||||
type ListObjectVersionsResult struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"`
|
||||
Name string `xml:"Name"`
|
||||
Prefix string `xml:"Prefix"`
|
||||
KeyMarker string `xml:"KeyMarker,omitempty"`
|
||||
VersionIdMarker string `xml:"VersionIdMarker,omitempty"`
|
||||
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
|
||||
NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"`
|
||||
MaxKeys int `xml:"MaxKeys"`
|
||||
Delimiter string `xml:"Delimiter,omitempty"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
Versions []VersionEntry `xml:"Version,omitempty"`
|
||||
DeleteMarkers []DeleteMarkerEntry `xml:"DeleteMarker,omitempty"`
|
||||
CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
|
||||
}
|
||||
|
||||
// generateVersionId creates a unique version ID
|
||||
func generateVersionId() string {
|
||||
// Generate a random 16-byte value
|
||||
randBytes := make([]byte, 16)
|
||||
if _, err := rand.Read(randBytes); err != nil {
|
||||
glog.Errorf("Failed to generate random bytes for version ID: %v", err)
|
||||
return ""
|
||||
}
|
||||
|
||||
// Hash with current timestamp for uniqueness
|
||||
hash := sha256.Sum256(append(randBytes, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))...))
|
||||
|
||||
// Return first 32 characters of hex string (same length as AWS S3 version IDs)
|
||||
return hex.EncodeToString(hash[:])[:32]
|
||||
}
|
||||
|
||||
// getVersionedObjectDir returns the directory path for storing object versions
|
||||
func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string {
|
||||
return path.Join(s3a.option.BucketsPath, bucket, object+".versions")
|
||||
}
|
||||
|
||||
// getVersionFileName returns the filename for a specific version
|
||||
func (s3a *S3ApiServer) getVersionFileName(versionId string) string {
|
||||
return fmt.Sprintf("v_%s", versionId)
|
||||
}
|
||||
|
||||
// createDeleteMarker creates a delete marker for versioned delete operations
|
||||
func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error) {
|
||||
versionId := generateVersionId()
|
||||
|
||||
glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s", versionId, bucket, object)
|
||||
|
||||
// Create the version file name for the delete marker
|
||||
versionFileName := s3a.getVersionFileName(versionId)
|
||||
|
||||
// Store delete marker in the .versions directory
|
||||
// Make sure to clean up the object path to remove leading slashes
|
||||
cleanObject := strings.TrimPrefix(object, "/")
|
||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||
versionsDir := bucketDir + "/" + cleanObject + ".versions"
|
||||
|
||||
// Create the delete marker entry in the .versions directory
|
||||
err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) {
|
||||
entry.Name = versionFileName
|
||||
entry.IsDirectory = false
|
||||
if entry.Attributes == nil {
|
||||
entry.Attributes = &filer_pb.FuseAttributes{}
|
||||
}
|
||||
entry.Attributes.Mtime = time.Now().Unix()
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
|
||||
entry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("true")
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create delete marker in .versions directory: %v", err)
|
||||
}
|
||||
|
||||
// Update the .versions directory metadata to indicate this delete marker is the latest version
|
||||
err = s3a.updateLatestVersionInDirectory(bucket, cleanObject, versionId, versionFileName)
|
||||
if err != nil {
|
||||
glog.Errorf("createDeleteMarker: failed to update latest version in directory: %v", err)
|
||||
return "", fmt.Errorf("failed to update latest version in directory: %v", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("createDeleteMarker: successfully created delete marker %s for %s/%s", versionId, bucket, object)
|
||||
return versionId, nil
|
||||
}
|
||||
|
||||
// listObjectVersions lists all versions of an object
|
||||
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) {
|
||||
var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
|
||||
|
||||
// List all entries in bucket
|
||||
entries, _, err := s3a.list(path.Join(s3a.option.BucketsPath, bucket), prefix, keyMarker, false, uint32(maxKeys*2))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// For each entry, check if it's a .versions directory
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDirectory {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this is a .versions directory
|
||||
if !strings.HasSuffix(entry.Name, ".versions") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract object name from .versions directory name
|
||||
objectKey := strings.TrimSuffix(entry.Name, ".versions")
|
||||
|
||||
versions, err := s3a.getObjectVersionList(bucket, objectKey)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, version := range versions {
|
||||
if version.IsDeleteMarker {
|
||||
deleteMarker := &DeleteMarkerEntry{
|
||||
Key: objectKey,
|
||||
VersionId: version.VersionId,
|
||||
IsLatest: version.IsLatest,
|
||||
LastModified: version.LastModified,
|
||||
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
|
||||
}
|
||||
allVersions = append(allVersions, deleteMarker)
|
||||
} else {
|
||||
versionEntry := &VersionEntry{
|
||||
Key: objectKey,
|
||||
VersionId: version.VersionId,
|
||||
IsLatest: version.IsLatest,
|
||||
LastModified: version.LastModified,
|
||||
ETag: version.ETag,
|
||||
Size: version.Size,
|
||||
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
|
||||
StorageClass: "STANDARD",
|
||||
}
|
||||
allVersions = append(allVersions, versionEntry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by key, then by LastModified and VersionId
|
||||
sort.Slice(allVersions, func(i, j int) bool {
|
||||
var keyI, keyJ string
|
||||
var lastModifiedI, lastModifiedJ time.Time
|
||||
var versionIdI, versionIdJ string
|
||||
|
||||
switch v := allVersions[i].(type) {
|
||||
case *VersionEntry:
|
||||
keyI = v.Key
|
||||
lastModifiedI = v.LastModified
|
||||
versionIdI = v.VersionId
|
||||
case *DeleteMarkerEntry:
|
||||
keyI = v.Key
|
||||
lastModifiedI = v.LastModified
|
||||
versionIdI = v.VersionId
|
||||
}
|
||||
|
||||
switch v := allVersions[j].(type) {
|
||||
case *VersionEntry:
|
||||
keyJ = v.Key
|
||||
lastModifiedJ = v.LastModified
|
||||
versionIdJ = v.VersionId
|
||||
case *DeleteMarkerEntry:
|
||||
keyJ = v.Key
|
||||
lastModifiedJ = v.LastModified
|
||||
versionIdJ = v.VersionId
|
||||
}
|
||||
|
||||
if keyI != keyJ {
|
||||
return keyI < keyJ
|
||||
}
|
||||
if !lastModifiedI.Equal(lastModifiedJ) {
|
||||
return lastModifiedI.After(lastModifiedJ)
|
||||
}
|
||||
return versionIdI < versionIdJ
|
||||
})
|
||||
|
||||
// Build result
|
||||
result := &ListObjectVersionsResult{
|
||||
Name: bucket,
|
||||
Prefix: prefix,
|
||||
KeyMarker: keyMarker,
|
||||
MaxKeys: maxKeys,
|
||||
Delimiter: delimiter,
|
||||
IsTruncated: len(allVersions) > maxKeys,
|
||||
}
|
||||
|
||||
// Limit results
|
||||
if len(allVersions) > maxKeys {
|
||||
allVersions = allVersions[:maxKeys]
|
||||
result.IsTruncated = true
|
||||
|
||||
// Set next markers
|
||||
switch v := allVersions[len(allVersions)-1].(type) {
|
||||
case *VersionEntry:
|
||||
result.NextKeyMarker = v.Key
|
||||
result.NextVersionIdMarker = v.VersionId
|
||||
case *DeleteMarkerEntry:
|
||||
result.NextKeyMarker = v.Key
|
||||
result.NextVersionIdMarker = v.VersionId
|
||||
}
|
||||
}
|
||||
|
||||
// Add versions to result
|
||||
for _, version := range allVersions {
|
||||
switch v := version.(type) {
|
||||
case *VersionEntry:
|
||||
result.Versions = append(result.Versions, *v)
|
||||
case *DeleteMarkerEntry:
|
||||
result.DeleteMarkers = append(result.DeleteMarkers, *v)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// getObjectVersionList returns all versions of a specific object
|
||||
func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
|
||||
var versions []*ObjectVersion
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: looking for versions of %s/%s in .versions directory", bucket, object)
|
||||
|
||||
// All versions are now stored in the .versions directory only
|
||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||
versionsObjectPath := object + ".versions"
|
||||
glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath)
|
||||
|
||||
// Get the .versions directory entry to read latest version metadata
|
||||
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
|
||||
if err != nil {
|
||||
// No versions directory exists, return empty list
|
||||
glog.V(2).Infof("getObjectVersionList: no versions directory found: %v", err)
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
// Get the latest version info from directory metadata
|
||||
var latestVersionId string
|
||||
if versionsEntry.Extended != nil {
|
||||
if latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatestVersionId {
|
||||
latestVersionId = string(latestVersionIdBytes)
|
||||
glog.V(2).Infof("getObjectVersionList: latest version ID from directory metadata: %s", latestVersionId)
|
||||
}
|
||||
}
|
||||
|
||||
// List all version files in the .versions directory
|
||||
entries, _, err := s3a.list(bucketDir+"/"+versionsObjectPath, "", "", false, 1000)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("getObjectVersionList: failed to list version files: %v", err)
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries))
|
||||
|
||||
for i, entry := range entries {
|
||||
if entry.Extended == nil {
|
||||
glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
|
||||
continue
|
||||
}
|
||||
|
||||
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
|
||||
if !hasVersionId {
|
||||
glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i)
|
||||
continue
|
||||
}
|
||||
|
||||
versionId := string(versionIdBytes)
|
||||
|
||||
// Check if this version is the latest by comparing with directory metadata
|
||||
isLatest := (versionId == latestVersionId)
|
||||
|
||||
isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
|
||||
isDeleteMarker := string(isDeleteMarkerBytes) == "true"
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker)
|
||||
|
||||
version := &ObjectVersion{
|
||||
VersionId: versionId,
|
||||
IsLatest: isLatest,
|
||||
IsDeleteMarker: isDeleteMarker,
|
||||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
if !isDeleteMarker {
|
||||
// Try to get ETag from Extended attributes first
|
||||
if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
|
||||
version.ETag = string(etagBytes)
|
||||
} else {
|
||||
// Fallback: calculate ETag from chunks
|
||||
version.ETag = s3a.calculateETagFromChunks(entry.Chunks)
|
||||
}
|
||||
version.Size = int64(entry.Attributes.FileSize)
|
||||
}
|
||||
|
||||
versions = append(versions, version)
|
||||
}
|
||||
|
||||
// Sort by modification time (newest first)
|
||||
sort.Slice(versions, func(i, j int) bool {
|
||||
return versions[i].LastModified.After(versions[j].LastModified)
|
||||
})
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s", len(versions), bucket, object)
|
||||
for i, version := range versions {
|
||||
glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
|
||||
}
|
||||
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
// calculateETagFromChunks calculates ETag from file chunks following S3 multipart rules
|
||||
// This is a wrapper around filer.ETagChunks that adds quotes for S3 compatibility
|
||||
func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) string {
|
||||
if len(chunks) == 0 {
|
||||
return "\"\""
|
||||
}
|
||||
|
||||
// Use the existing filer ETag calculation and add quotes for S3 compatibility
|
||||
etag := filer.ETagChunks(chunks)
|
||||
if etag == "" {
|
||||
return "\"\""
|
||||
}
|
||||
return fmt.Sprintf("\"%s\"", etag)
|
||||
}
|
||||
|
||||
// getSpecificObjectVersion retrieves a specific version of an object
|
||||
func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) {
|
||||
if versionId == "" {
|
||||
// Get current version
|
||||
return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/"))
|
||||
}
|
||||
|
||||
// Get specific version from .versions directory
|
||||
versionsDir := s3a.getVersionedObjectDir(bucket, object)
|
||||
versionFile := s3a.getVersionFileName(versionId)
|
||||
|
||||
entry, err := s3a.getEntry(versionsDir, versionFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("version %s not found: %v", versionId, err)
|
||||
}
|
||||
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
// deleteSpecificObjectVersion deletes a specific version of an object
|
||||
func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error {
|
||||
if versionId == "" {
|
||||
return fmt.Errorf("version ID is required for version-specific deletion")
|
||||
}
|
||||
|
||||
versionsDir := s3a.getVersionedObjectDir(bucket, object)
|
||||
versionFile := s3a.getVersionFileName(versionId)
|
||||
|
||||
// Delete the specific version from .versions directory
|
||||
_, err := s3a.getEntry(versionsDir, versionFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("version %s not found: %v", versionId, err)
|
||||
}
|
||||
|
||||
// Version exists, delete it
|
||||
deleteErr := s3a.rm(versionsDir, versionFile, true, false)
|
||||
if deleteErr != nil {
|
||||
// Check if file was already deleted by another process
|
||||
if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
|
||||
// File doesn't exist anymore, deletion was successful
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListObjectVersionsHandler handles the list object versions request
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
|
||||
func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
bucket, _ := s3_constants.GetBucketAndObject(r)
|
||||
glog.V(3).Infof("ListObjectVersionsHandler %s", bucket)
|
||||
|
||||
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse query parameters
|
||||
query := r.URL.Query()
|
||||
prefix := query.Get("prefix")
|
||||
if prefix != "" && !strings.HasPrefix(prefix, "/") {
|
||||
prefix = "/" + prefix
|
||||
}
|
||||
|
||||
keyMarker := query.Get("key-marker")
|
||||
versionIdMarker := query.Get("version-id-marker")
|
||||
delimiter := query.Get("delimiter")
|
||||
|
||||
maxKeysStr := query.Get("max-keys")
|
||||
maxKeys := 1000
|
||||
if maxKeysStr != "" {
|
||||
if mk, err := strconv.Atoi(maxKeysStr); err == nil && mk > 0 {
|
||||
maxKeys = mk
|
||||
}
|
||||
}
|
||||
|
||||
// List versions
|
||||
result, err := s3a.listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter, maxKeys)
|
||||
if err != nil {
|
||||
glog.Errorf("ListObjectVersionsHandler: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseXML(w, r, result)
|
||||
}
|
||||
|
||||
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
|
||||
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
|
||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||
versionsObjectPath := object + ".versions"
|
||||
|
||||
// Get the .versions directory entry to read latest version metadata
|
||||
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get .versions directory: %v", err)
|
||||
}
|
||||
|
||||
// Check if directory has latest version metadata
|
||||
if versionsEntry.Extended == nil {
|
||||
return nil, fmt.Errorf("no version metadata found in .versions directory for %s/%s", bucket, object)
|
||||
}
|
||||
|
||||
latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
|
||||
latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
|
||||
|
||||
if !hasLatestVersionId || !hasLatestVersionFile {
|
||||
return nil, fmt.Errorf("incomplete latest version metadata in .versions directory for %s/%s", bucket, object)
|
||||
}
|
||||
|
||||
latestVersionId := string(latestVersionIdBytes)
|
||||
latestVersionFile := string(latestVersionFileBytes)
|
||||
|
||||
glog.V(2).Infof("getLatestObjectVersion: found latest version %s (file: %s) for %s/%s", latestVersionId, latestVersionFile, bucket, object)
|
||||
|
||||
// Get the actual latest version file entry
|
||||
latestVersionPath := versionsObjectPath + "/" + latestVersionFile
|
||||
latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get latest version file %s: %v", latestVersionPath, err)
|
||||
}
|
||||
|
||||
return latestVersionEntry, nil
|
||||
}
|
||||
@@ -50,6 +50,7 @@ type S3ApiServer struct {
|
||||
client util_http_client.HTTPClientInterface
|
||||
bucketRegistry *BucketRegistry
|
||||
credentialManager *credential.CredentialManager
|
||||
bucketConfigCache *BucketConfigCache
|
||||
}
|
||||
|
||||
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
|
||||
@@ -87,6 +88,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
|
||||
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
|
||||
cb: NewCircuitBreaker(option),
|
||||
credentialManager: iam.credentialManager,
|
||||
bucketConfigCache: NewBucketConfigCache(5 * time.Minute),
|
||||
}
|
||||
|
||||
if option.Config != "" {
|
||||
@@ -288,6 +290,9 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
||||
// ListObjectsV2
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2")
|
||||
|
||||
// ListObjectVersions
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectVersionsHandler, ACTION_LIST)), "LIST")).Queries("versions", "")
|
||||
|
||||
// buckets with query
|
||||
// PutBucketOwnershipControls
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketOwnershipControls, ACTION_ADMIN), "PUT")).Queries("ownershipControls", "")
|
||||
|
||||
Reference in New Issue
Block a user