fix: ListBuckets returns empty for users with bucket-specific permissions (#7799)

* fix: ListBuckets returns empty for users with bucket-specific permissions (#7796)

The ListBucketsHandler was using sequential AND logic where ownership check
happened before permission check. If a user had 'List:bucketname' permission
but didn't own the bucket (different AmzIdentityId or missing owner metadata),
the bucket was filtered out before the permission check could run.

Changed to OR logic: a bucket is now visible if the user owns it OR has
explicit permission to list it. This allows users with bucket-specific
permissions like 'List:geoserver' to see buckets they have access to,
even if they don't own them.

Changes:
- Modified ListBucketsHandler to check both ownership and permission,
  including bucket if either check passes
- Renamed isBucketVisibleToIdentity to isBucketOwnedByIdentity for clarity
- Added comprehensive tests in TestListBucketsIssue7796

Fixes #7796

* address review comments: optimize permission check and add integration test

- Skip permission check if user is already the owner (performance optimization)
- Add integration test that simulates the complete handler filtering logic
  to verify the combination of ownership OR permission check works correctly

* add visibility assertions to each sub-test for self-contained verification

Each sub-test now verifies the final outcome using isOwner || canList logic,
making tests more robust and independently verifiable.
This commit is contained in:
Chris Lu
2025-12-17 00:09:13 -08:00
committed by GitHub
parent 9e9c97ec61
commit a77b145590
2 changed files with 302 additions and 25 deletions

View File

@@ -776,3 +776,270 @@ func TestListBucketsIssue7647(t *testing.T) {
assert.False(t, isVisible, "Non-admin should not see buckets without owner metadata")
})
}
// TestListBucketsIssue7796 reproduces and verifies the fix for issue #7796
// where a user with bucket-specific List permission (e.g., "List:geoserver")
// couldn't see buckets they have access to but don't own
func TestListBucketsIssue7796(t *testing.T) {
t.Run("user with bucket-specific List permission can see bucket they don't own", func(t *testing.T) {
// Simulate the exact scenario from issue #7796:
// User "geoserver" with ["List:geoserver", "Read:geoserver", "Write:geoserver", ...] permissions
// But the bucket "geoserver" was created by a different user (e.g., admin)
geoserverIdentity := &Identity{
Name: "geoserver",
Credentials: []*Credential{
{
AccessKey: "geoserver",
SecretKey: "secret",
},
},
Actions: []Action{
Action("List:geoserver"),
Action("Read:geoserver"),
Action("Write:geoserver"),
Action("Admin:geoserver"),
Action("List:geoserver-ttl"),
Action("Read:geoserver-ttl"),
Action("Write:geoserver-ttl"),
},
}
// Bucket was created by admin, not by geoserver user
geoserverBucket := &filer_pb.Entry{
Name: "geoserver",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("admin"), // Different owner
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
},
}
// Test ownership check - should return false (not owned by geoserver)
isOwner := isBucketOwnedByIdentity(geoserverBucket, geoserverIdentity)
assert.False(t, isOwner, "geoserver user should not be owner of bucket created by admin")
// Test permission check - should return true (has List:geoserver permission)
canList := geoserverIdentity.canDo(s3_constants.ACTION_LIST, "geoserver", "")
assert.True(t, canList, "geoserver user with List:geoserver should be able to list geoserver bucket")
// Verify the combined visibility logic: ownership OR permission
isVisible := isOwner || canList
assert.True(t, isVisible, "Bucket should be visible due to permission (even though not owner)")
})
t.Run("user with bucket-specific permission sees bucket without owner metadata", func(t *testing.T) {
// Bucket exists but has no owner metadata (legacy bucket or created before ownership tracking)
geoserverIdentity := &Identity{
Name: "geoserver",
Actions: []Action{
Action("List:geoserver"),
Action("Read:geoserver"),
},
}
bucketWithoutOwner := &filer_pb.Entry{
Name: "geoserver",
IsDirectory: true,
Extended: map[string][]byte{}, // No owner metadata
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
// Not owner (no owner metadata)
isOwner := isBucketOwnedByIdentity(bucketWithoutOwner, geoserverIdentity)
assert.False(t, isOwner, "No owner metadata means not owned")
// But has permission
canList := geoserverIdentity.canDo(s3_constants.ACTION_LIST, "geoserver", "")
assert.True(t, canList, "Has explicit List:geoserver permission")
// Verify the combined visibility logic: ownership OR permission
isVisible := isOwner || canList
assert.True(t, isVisible, "Bucket should be visible due to permission (even without owner metadata)")
})
t.Run("user cannot see bucket they neither own nor have permission for", func(t *testing.T) {
// User has no ownership and no permission for the bucket
geoserverIdentity := &Identity{
Name: "geoserver",
Actions: []Action{
Action("List:geoserver"),
Action("Read:geoserver"),
},
}
otherBucket := &filer_pb.Entry{
Name: "otherbucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("admin"),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
// Not owner
isOwner := isBucketOwnedByIdentity(otherBucket, geoserverIdentity)
assert.False(t, isOwner, "geoserver doesn't own otherbucket")
// No permission for this bucket
canList := geoserverIdentity.canDo(s3_constants.ACTION_LIST, "otherbucket", "")
assert.False(t, canList, "geoserver has no List permission for otherbucket")
// Verify the combined visibility logic: ownership OR permission
isVisible := isOwner || canList
assert.False(t, isVisible, "Bucket should NOT be visible (neither owner nor has permission)")
})
t.Run("user with wildcard permission sees matching buckets", func(t *testing.T) {
// User has "List:geo*" permission - should see any bucket starting with "geo"
geoIdentity := &Identity{
Name: "geouser",
Actions: []Action{
Action("List:geo*"),
Action("Read:geo*"),
},
}
geoBucket := &filer_pb.Entry{
Name: "geoserver",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("admin"),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
geoTTLBucket := &filer_pb.Entry{
Name: "geoserver-ttl",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("admin"),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
// Not owner of either bucket
isOwnerGeo := isBucketOwnedByIdentity(geoBucket, geoIdentity)
isOwnerGeoTTL := isBucketOwnedByIdentity(geoTTLBucket, geoIdentity)
assert.False(t, isOwnerGeo)
assert.False(t, isOwnerGeoTTL)
// But has permission via wildcard
canListGeo := geoIdentity.canDo(s3_constants.ACTION_LIST, "geoserver", "")
canListGeoTTL := geoIdentity.canDo(s3_constants.ACTION_LIST, "geoserver-ttl", "")
assert.True(t, canListGeo)
assert.True(t, canListGeoTTL)
// Verify the combined visibility logic for matching buckets
assert.True(t, isOwnerGeo || canListGeo, "geoserver bucket should be visible via wildcard permission")
assert.True(t, isOwnerGeoTTL || canListGeoTTL, "geoserver-ttl bucket should be visible via wildcard permission")
// Should NOT have permission for unrelated buckets
canListOther := geoIdentity.canDo(s3_constants.ACTION_LIST, "otherbucket", "")
assert.False(t, canListOther, "No permission for otherbucket")
assert.False(t, false || canListOther, "otherbucket should NOT be visible (no ownership, no permission)")
})
t.Run("integration test: complete handler filtering logic", func(t *testing.T) {
// This test simulates the complete filtering logic as used in ListBucketsHandler
// to verify that the combination of ownership OR permission check works correctly
// User "geoserver" with bucket-specific permissions (same as issue #7796)
geoserverIdentity := &Identity{
Name: "geoserver",
Credentials: []*Credential{
{AccessKey: "geoserver", SecretKey: "secret"},
},
Actions: []Action{
Action("List:geoserver"),
Action("Read:geoserver"),
Action("Write:geoserver"),
Action("Admin:geoserver"),
Action("List:geoserver-ttl"),
Action("Read:geoserver-ttl"),
Action("Write:geoserver-ttl"),
},
}
// Create test buckets with various ownership scenarios
buckets := []*filer_pb.Entry{
{
// Bucket owned by admin but geoserver has permission
Name: "geoserver",
IsDirectory: true,
Extended: map[string][]byte{s3_constants.AmzIdentityId: []byte("admin")},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
{
// Bucket with no owner but geoserver has permission
Name: "geoserver-ttl",
IsDirectory: true,
Extended: map[string][]byte{},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
{
// Bucket owned by geoserver (should be visible via ownership)
Name: "geoserver-owned",
IsDirectory: true,
Extended: map[string][]byte{s3_constants.AmzIdentityId: []byte("geoserver")},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
{
// Bucket owned by someone else, no permission for geoserver
Name: "otherbucket",
IsDirectory: true,
Extended: map[string][]byte{s3_constants.AmzIdentityId: []byte("otheruser")},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
}
// Simulate the exact filtering logic from ListBucketsHandler
var visibleBuckets []string
for _, entry := range buckets {
if !entry.IsDirectory {
continue
}
// Check ownership
isOwner := isBucketOwnedByIdentity(entry, geoserverIdentity)
// Skip permission check if user is already the owner (optimization)
if !isOwner {
// Check permission
hasPermission := geoserverIdentity.canDo(s3_constants.ACTION_LIST, entry.Name, "")
if !hasPermission {
continue
}
}
visibleBuckets = append(visibleBuckets, entry.Name)
}
// Expected: geoserver should see:
// - "geoserver" (has List:geoserver permission, even though owned by admin)
// - "geoserver-ttl" (has List:geoserver-ttl permission, even though no owner)
// - "geoserver-owned" (owns this bucket)
// NOT "otherbucket" (neither owns nor has permission)
expectedBuckets := []string{"geoserver", "geoserver-ttl", "geoserver-owned"}
assert.ElementsMatch(t, expectedBuckets, visibleBuckets,
"geoserver should see buckets they own OR have permission for")
// Verify "otherbucket" is NOT in the list
assert.NotContains(t, visibleBuckets, "otherbucket",
"geoserver should NOT see buckets they neither own nor have permission for")
})
}