S3: Perf related (#7463)
* reduce checks * s3 object lookup optimization * Only check versioning configuration if client requests * Consolidate SSE Entry Lookups * optimize * revert optimization for versioned objects * Removed: getObjectEntryForSSE() function * refactor * Refactoring: Added fetchObjectEntryRequired * avoid refetching * return early if not found * reuse objects from conditional check * clear cache when creating bucket
This commit is contained in:
@@ -264,6 +264,8 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
||||
// Note: We need to check this even if versionId is empty, because versioned buckets
|
||||
// handle even "get latest version" requests differently (through .versions directory)
|
||||
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
@@ -344,31 +346,47 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||
destUrl = s3a.toFilerUrl(bucket, object)
|
||||
}
|
||||
|
||||
// Check if this is a range request to an SSE object and modify the approach
|
||||
// Fetch the correct entry for SSE processing (respects versionId)
|
||||
// This consolidates entry lookups to avoid multiple filer calls
|
||||
var objectEntryForSSE *filer_pb.Entry
|
||||
originalRangeHeader := r.Header.Get("Range")
|
||||
var sseObject = false
|
||||
|
||||
// Pre-check if this object is SSE encrypted to avoid filer range conflicts
|
||||
if originalRangeHeader != "" {
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
||||
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
|
||||
primarySSEType := s3a.detectPrimarySSEType(objectEntry)
|
||||
if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
|
||||
sseObject = true
|
||||
// Temporarily remove Range header to get full encrypted data from filer
|
||||
r.Header.Del("Range")
|
||||
|
||||
if versioningConfigured {
|
||||
// For versioned objects, reuse the already-fetched entry
|
||||
objectEntryForSSE = entry
|
||||
} else {
|
||||
// For non-versioned objects, try to reuse entry from conditional header check
|
||||
if result.Entry != nil {
|
||||
// Reuse entry fetched during conditional header check (optimization)
|
||||
objectEntryForSSE = result.Entry
|
||||
glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
|
||||
} else {
|
||||
// No conditional headers were checked, fetch entry for SSE processing
|
||||
var fetchErr error
|
||||
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
|
||||
if fetchErr != nil {
|
||||
glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
if objectEntryForSSE == nil {
|
||||
// Not found, return error early to avoid another lookup in proxyToFiler
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch the correct entry for SSE processing (respects versionId)
|
||||
objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry)
|
||||
if err != nil {
|
||||
glog.Errorf("GetObjectHandler: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
// Check if this is an SSE object for Range request handling
|
||||
// This applies to both versioned and non-versioned objects
|
||||
if originalRangeHeader != "" && objectEntryForSSE != nil {
|
||||
primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE)
|
||||
if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
|
||||
sseObject = true
|
||||
// Temporarily remove Range header to get full encrypted data from filer
|
||||
r.Header.Del("Range")
|
||||
}
|
||||
}
|
||||
|
||||
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
|
||||
@@ -415,6 +433,8 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
||||
// Note: We need to check this even if versionId is empty, because versioned buckets
|
||||
// handle even "get latest version" requests differently (through .versions directory)
|
||||
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
@@ -494,11 +514,31 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
|
||||
// Fetch the correct entry for SSE processing (respects versionId)
|
||||
objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry)
|
||||
if err != nil {
|
||||
glog.Errorf("HeadObjectHandler: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
// For versioned objects, reuse already-fetched entry; for non-versioned, try to reuse from conditional check
|
||||
var objectEntryForSSE *filer_pb.Entry
|
||||
if versioningConfigured {
|
||||
objectEntryForSSE = entry
|
||||
} else {
|
||||
// For non-versioned objects, try to reuse entry from conditional header check
|
||||
if result.Entry != nil {
|
||||
// Reuse entry fetched during conditional header check (optimization)
|
||||
objectEntryForSSE = result.Entry
|
||||
glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
|
||||
} else {
|
||||
// No conditional headers were checked, fetch entry for SSE processing
|
||||
var fetchErr error
|
||||
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
|
||||
if fetchErr != nil {
|
||||
glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
if objectEntryForSSE == nil {
|
||||
// Not found, return error early to avoid another lookup in proxyToFiler
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
|
||||
@@ -658,21 +698,27 @@ func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bod
|
||||
return statusCode, bytesTransferred
|
||||
}
|
||||
|
||||
// getObjectEntryForSSE fetches the correct filer entry for SSE processing
|
||||
// For versioned objects, it reuses the already-fetched entry
|
||||
// For non-versioned objects, it fetches the entry from the filer
|
||||
func (s3a *S3ApiServer) getObjectEntryForSSE(r *http.Request, versioningConfigured bool, versionedEntry *filer_pb.Entry) (*filer_pb.Entry, error) {
|
||||
if versioningConfigured {
|
||||
// For versioned objects, we already have the correct entry
|
||||
return versionedEntry, nil
|
||||
}
|
||||
|
||||
// For non-versioned objects, fetch the entry
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
// fetchObjectEntry fetches the filer entry for an object
|
||||
// Returns nil if not found (not an error), or propagates other errors
|
||||
func (s3a *S3ApiServer) fetchObjectEntry(bucket, object string) (*filer_pb.Entry, error) {
|
||||
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
||||
fetchedEntry, err := s3a.getEntry("", objectPath)
|
||||
if err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
|
||||
return nil, fmt.Errorf("failed to get entry for SSE check %s: %w", objectPath, err)
|
||||
fetchedEntry, fetchErr := s3a.getEntry("", objectPath)
|
||||
if fetchErr != nil {
|
||||
if errors.Is(fetchErr, filer_pb.ErrNotFound) {
|
||||
return nil, nil // Not found is not an error for SSE check
|
||||
}
|
||||
return nil, fetchErr // Propagate other errors
|
||||
}
|
||||
return fetchedEntry, nil
|
||||
}
|
||||
|
||||
// fetchObjectEntryRequired fetches the filer entry for an object
|
||||
// Returns an error if the object is not found or any other error occurs
|
||||
func (s3a *S3ApiServer) fetchObjectEntryRequired(bucket, object string) (*filer_pb.Entry, error) {
|
||||
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
||||
fetchedEntry, fetchErr := s3a.getEntry("", objectPath)
|
||||
if fetchErr != nil {
|
||||
return nil, fetchErr // Return error for both not-found and other errors
|
||||
}
|
||||
return fetchedEntry, nil
|
||||
}
|
||||
@@ -750,7 +796,7 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.
|
||||
if sseCChunks >= 1 {
|
||||
|
||||
// Handle chunked SSE-C objects - each chunk needs independent decryption
|
||||
multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse)
|
||||
multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse, entry)
|
||||
if decErr != nil {
|
||||
glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
@@ -966,7 +1012,7 @@ func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *htt
|
||||
var decryptedReader io.Reader
|
||||
if isMultipartSSEKMS {
|
||||
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
|
||||
multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse)
|
||||
multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry)
|
||||
if decErr != nil {
|
||||
glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
@@ -1271,16 +1317,8 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
|
||||
}
|
||||
|
||||
// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
|
||||
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
|
||||
// Get the object path from the request
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
||||
|
||||
// Get the object entry from filer to access chunk information
|
||||
entry, err := s3a.getEntry("", objectPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err)
|
||||
}
|
||||
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
|
||||
// Entry is passed from caller to avoid redundant filer lookup
|
||||
|
||||
// Sort chunks by offset to ensure correct order
|
||||
chunks := entry.GetChunks()
|
||||
@@ -1531,22 +1569,14 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) {
|
||||
|
||||
// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
|
||||
// Each chunk has its own IV and encryption key from the original multipart parts
|
||||
func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
|
||||
func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
|
||||
// Parse SSE-C headers from the request for decryption key
|
||||
customerKey, err := ParseSSECHeaders(r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err)
|
||||
}
|
||||
|
||||
// Get the object path from the request
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
|
||||
|
||||
// Get the object entry from filer to access chunk information
|
||||
entry, err := s3a.getEntry("", objectPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err)
|
||||
}
|
||||
// Entry is passed from caller to avoid redundant filer lookup
|
||||
|
||||
// Sort chunks by offset to ensure correct order
|
||||
chunks := entry.GetChunks()
|
||||
|
||||
Reference in New Issue
Block a user