* Fix: Support object tagging in versioned buckets (Issue #7868) This fix addresses the issue where setting tags on files in versioned buckets would fail with 'filer: no entry is found in filer store' error. Changes: - Updated GetObjectTaggingHandler to check versioning status and retrieve correct object versions - Updated PutObjectTaggingHandler to properly locate and update tags on versioned objects - Updated DeleteObjectTaggingHandler to delete tags from versioned objects - Added proper handling for both specific versions and latest versions - Added distinction between null versions (pre-versioning objects) and versioned objects The fix follows the same versioning-aware pattern already implemented in ACL handlers. Tests: - Added comprehensive test suite for tagging operations on versioned buckets - Tests cover PUT, GET, and DELETE tagging operations on specific versions and latest versions - Tests verify tag isolation between different versions of the same object * Fix: Ensure consistent directory path construction in tagging handlers Changed directory path construction to match the pattern used in ACL handlers: - Added missing '/' before object path when constructing .versions directory path - This ensures compatibility with the filer's expected path structure - Applied to both PutObjectTaggingHandler and DeleteObjectTaggingHandler * Revert: Remove redundant slash in path construction - object already has leading slash from NormalizeObjectKey * Fix: Remove redundant slashes in versioning path construction across handlers - getVersionedObjectDir: object already starts with '/', no need for extra '/' - ACL handlers: same pattern, fix both PutObjectAcl locations - Ensures consistent path construction with object parameter normalization * fix test compilation * Add: Comprehensive ACL tests for versioned and non-versioned buckets - Added s3_acl_versioning_test.go with 5 test cases covering: * GetObjectAcl on versioned buckets * GetObjectAcl on specific versions * PutObjectAcl on versioned buckets * PutObjectAcl on specific versions * Independent ACL management across versions These tests were missing and would have caught the path construction issues we just fixed in the ACL handler. Tests validate that ACL operations work correctly on both versioned and non-versioned objects. * Fix: Correct tagging versioning test file formatting * fix: Update AWS SDK endpoint config and improve cleanup to handle delete markers - Replace deprecated EndpointResolverWithOptions with BaseEndpoint in AWS SDK v2 client configuration - Update cleanupTestBucket to properly delete both object versions and delete markers - Apply changes to both ACL and tagging test files for consistency * Fix S3 multi-delete for versioned objects The bug was in getVersionedObjectDir() which was constructing paths without a slash between the bucket and object key: BEFORE (WRONG): /buckets/mybucket{key}.versions AFTER (FIXED): /buckets/mybucket/{key}/.versions This caused version deletions to claim success but not actually delete files, breaking S3 compatibility tests: - test_versioning_multi_object_delete - test_versioning_multi_object_delete_with_marker - test_versioning_concurrent_multi_object_delete - test_object_lock_multi_delete_object_with_retention Added comprehensive test that reproduces the issue and verifies the fix. * Remove emojis from test output
This commit is contained in:
@@ -306,7 +306,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
|
||||
if versioningConfigured {
|
||||
if versionId != "" && versionId != "null" {
|
||||
// Versioned object - update the specific version file in .versions directory
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + object + s3_constants.VersionsFolder
|
||||
} else {
|
||||
// Latest version in versioned bucket - could be null version or versioned object
|
||||
// Extract version ID from the entry to determine where it's stored
|
||||
@@ -322,7 +322,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket
|
||||
} else {
|
||||
// Versioned object - stored in .versions directory
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + object + s3_constants.VersionsFolder
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
@@ -20,21 +22,79 @@ func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.R
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
glog.V(3).Infof("GetObjectTaggingHandler %s %s", bucket, object)
|
||||
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
// Check for specific version ID in query parameters
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
tags, err := s3a.getTags(dir, name)
|
||||
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
||||
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
} else {
|
||||
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
glog.Errorf("GetObjectTaggingHandler: Error checking versioning status for bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
var entry *filer_pb.Entry
|
||||
|
||||
if versioningConfigured {
|
||||
// Handle versioned object tagging retrieval
|
||||
if versionId != "" {
|
||||
// Request for specific version
|
||||
glog.V(2).Infof("GetObjectTaggingHandler: requesting tags for specific version %s of %s%s", versionId, bucket, object)
|
||||
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
||||
} else {
|
||||
// Request for latest version
|
||||
glog.V(2).Infof("GetObjectTaggingHandler: requesting tags for latest version of %s%s", bucket, object)
|
||||
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("GetObjectTaggingHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this is a delete marker
|
||||
if entry.Extended != nil {
|
||||
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Handle regular (non-versioned) object tagging retrieval
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
|
||||
tags, err := s3a.getTags(dir, name)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
} else {
|
||||
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseXML(w, r, FromTags(tags))
|
||||
return
|
||||
}
|
||||
|
||||
// Extract tags from the entry's extended attributes
|
||||
tags := make(map[string]string)
|
||||
if entry.Extended != nil {
|
||||
for k, v := range entry.Extended {
|
||||
if len(k) > len(S3TAG_PREFIX) && k[:len(S3TAG_PREFIX)] == S3TAG_PREFIX {
|
||||
tags[k[len(S3TAG_PREFIX):]] = string(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writeSuccessResponseXML(w, r, FromTags(tags))
|
||||
|
||||
}
|
||||
@@ -46,9 +106,6 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
glog.V(3).Infof("PutObjectTaggingHandler %s %s", bucket, object)
|
||||
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
|
||||
tagging := &Tagging{}
|
||||
input, err := io.ReadAll(io.LimitReader(r.Body, r.ContentLength))
|
||||
if err != nil {
|
||||
@@ -69,17 +126,133 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R
|
||||
return
|
||||
}
|
||||
|
||||
if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil {
|
||||
// Check for specific version ID in query parameters
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
||||
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
} else {
|
||||
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
glog.Errorf("PutObjectTaggingHandler: Error checking versioning status for bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
var entry *filer_pb.Entry
|
||||
|
||||
if versioningConfigured {
|
||||
// Handle versioned object tagging modification
|
||||
if versionId != "" {
|
||||
// Request for specific version
|
||||
glog.V(2).Infof("PutObjectTaggingHandler: modifying tags for specific version %s of %s%s", versionId, bucket, object)
|
||||
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
||||
} else {
|
||||
// Request for latest version
|
||||
glog.V(2).Infof("PutObjectTaggingHandler: modifying tags for latest version of %s%s", bucket, object)
|
||||
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("PutObjectTaggingHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this is a delete marker
|
||||
if entry.Extended != nil {
|
||||
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Handle regular (non-versioned) object tagging modification
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
|
||||
if err = s3a.setTags(dir, name, tags); err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
} else {
|
||||
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
|
||||
return
|
||||
}
|
||||
|
||||
// For versioned objects, determine the correct directory based on the version
|
||||
var updateDirectory string
|
||||
if versionId != "" {
|
||||
// Specific version requested
|
||||
if versionId == "null" {
|
||||
// Null version (pre-versioning object) - stored as regular file
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket
|
||||
} else {
|
||||
// Versioned object - stored in .versions directory
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + object + s3_constants.VersionsFolder
|
||||
}
|
||||
} else {
|
||||
// Latest version in versioned bucket - could be null version or versioned object
|
||||
// Extract version ID from the entry to determine where it's stored
|
||||
var actualVersionId string
|
||||
if entry.Extended != nil {
|
||||
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
||||
actualVersionId = string(versionIdBytes)
|
||||
}
|
||||
}
|
||||
|
||||
if actualVersionId == "null" || actualVersionId == "" {
|
||||
// Null version (pre-versioning object) - stored as regular file
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket
|
||||
} else {
|
||||
// Versioned object - stored in .versions directory
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + object + s3_constants.VersionsFolder
|
||||
}
|
||||
}
|
||||
|
||||
// Remove old tags and add new ones
|
||||
for k := range entry.Extended {
|
||||
if len(k) > len(S3TAG_PREFIX) && k[:len(S3TAG_PREFIX)] == S3TAG_PREFIX {
|
||||
delete(entry.Extended, k)
|
||||
}
|
||||
}
|
||||
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
for k, v := range tags {
|
||||
entry.Extended[S3TAG_PREFIX+k] = []byte(v)
|
||||
}
|
||||
|
||||
// Update the entry with tags
|
||||
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
request := &filer_pb.UpdateEntryRequest{
|
||||
Directory: updateDirectory,
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
if _, err := client.UpdateEntry(context.Background(), request); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("PutObjectTaggingHandler: failed to update entry: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(3).Infof("PutObjectTaggingHandler: Successfully updated tags for %s/%s", bucket, object)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
|
||||
}
|
||||
@@ -91,21 +264,136 @@ func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *htt
|
||||
bucket, object := s3_constants.GetBucketAndObject(r)
|
||||
glog.V(3).Infof("DeleteObjectTaggingHandler %s %s", bucket, object)
|
||||
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
// Check for specific version ID in query parameters
|
||||
versionId := r.URL.Query().Get("versionId")
|
||||
|
||||
err := s3a.rmTags(dir, name)
|
||||
// Check if versioning is configured for the bucket (Enabled or Suspended)
|
||||
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
} else {
|
||||
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
glog.Errorf("DeleteObjectTaggingHandler: Error checking versioning status for bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
var entry *filer_pb.Entry
|
||||
|
||||
if versioningConfigured {
|
||||
// Handle versioned object tagging deletion
|
||||
if versionId != "" {
|
||||
// Request for specific version
|
||||
glog.V(2).Infof("DeleteObjectTaggingHandler: deleting tags for specific version %s of %s%s", versionId, bucket, object)
|
||||
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
||||
} else {
|
||||
// Request for latest version
|
||||
glog.V(2).Infof("DeleteObjectTaggingHandler: deleting tags for latest version of %s%s", bucket, object)
|
||||
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("DeleteObjectTaggingHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this is a delete marker
|
||||
if entry.Extended != nil {
|
||||
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Handle regular (non-versioned) object tagging deletion
|
||||
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
||||
dir, name := target.DirAndName()
|
||||
|
||||
err := s3a.rmTags(dir, name)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
|
||||
} else {
|
||||
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
|
||||
return
|
||||
}
|
||||
|
||||
// For versioned objects, determine the correct directory based on the version
|
||||
var updateDirectory string
|
||||
if versionId != "" {
|
||||
// Specific version requested
|
||||
if versionId == "null" {
|
||||
// Null version (pre-versioning object) - stored as regular file
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket
|
||||
} else {
|
||||
// Versioned object - stored in .versions directory
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + object + s3_constants.VersionsFolder
|
||||
}
|
||||
} else {
|
||||
// Latest version in versioned bucket - could be null version or versioned object
|
||||
// Extract version ID from the entry to determine where it's stored
|
||||
var actualVersionId string
|
||||
if entry.Extended != nil {
|
||||
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
||||
actualVersionId = string(versionIdBytes)
|
||||
}
|
||||
}
|
||||
|
||||
if actualVersionId == "null" || actualVersionId == "" {
|
||||
// Null version (pre-versioning object) - stored as regular file
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket
|
||||
} else {
|
||||
// Versioned object - stored in .versions directory
|
||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + object + s3_constants.VersionsFolder
|
||||
}
|
||||
}
|
||||
|
||||
// Remove all tags
|
||||
hasDeletion := false
|
||||
for k := range entry.Extended {
|
||||
if len(k) > len(S3TAG_PREFIX) && k[:len(S3TAG_PREFIX)] == S3TAG_PREFIX {
|
||||
delete(entry.Extended, k)
|
||||
hasDeletion = true
|
||||
}
|
||||
}
|
||||
|
||||
if !hasDeletion {
|
||||
// No tags to delete - success
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
|
||||
return
|
||||
}
|
||||
|
||||
// Update the entry
|
||||
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
request := &filer_pb.UpdateEntryRequest{
|
||||
Directory: updateDirectory,
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
if _, err := client.UpdateEntry(context.Background(), request); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("DeleteObjectTaggingHandler: failed to update entry: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(3).Infof("DeleteObjectTaggingHandler: Successfully deleted tags for %s/%s", bucket, object)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
|
||||
}
|
||||
|
||||
@@ -184,4 +184,3 @@ func (s3a *S3ApiServer) generateVersionIdForObject(bucket, object string) string
|
||||
useInvertedFormat := s3a.getVersionIdFormat(bucket, object)
|
||||
return generateVersionId(useInvertedFormat)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user