Files
seaweedFS/weed/s3api/s3api_object_handlers_list_test.go
Chris Lu e1e4c9437a fix(s3api): ListObjects with trailing-slash prefix matches sibling directories (#8599)
fix(s3api): ListObjects with trailing-slash prefix returns wrong results

When ListObjectsV2 is called with a prefix ending in "/" (e.g., "foo/"),
normalizePrefixMarker strips the trailing slash and splits into
dir="parent" and prefix="foo". The filer then lists entries matching
prefix "foo", which returns both directory "foo" and "foo1000".

The prefixEndsOnDelimiter guard correctly identifies directory "foo" as
the target and recurses into it, but then resets the guard to false.
The loop continues and incorrectly recurses into "foo1000" as well,
causing the listing to return objects from unrelated directories.

Fix: after recursing into the exact directory targeted by the
trailing-slash prefix, return immediately from the listing loop.
There is no reason to process sibling entries since the original
prefix specifically targeted one directory.
2026-03-11 02:28:34 -07:00

888 lines
32 KiB
Go

package s3api
import (
"context"
"io"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/stretchr/testify/assert"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type testListEntriesStream struct {
entries []*filer_pb.Entry
idx int
}
func (s *testListEntriesStream) Recv() (*filer_pb.ListEntriesResponse, error) {
if s.idx >= len(s.entries) {
return nil, io.EOF
}
resp := &filer_pb.ListEntriesResponse{Entry: s.entries[s.idx]}
s.idx++
return resp, nil
}
func (s *testListEntriesStream) Header() (metadata.MD, error) { return metadata.MD{}, nil }
func (s *testListEntriesStream) Trailer() metadata.MD { return metadata.MD{} }
func (s *testListEntriesStream) Close() error { return nil }
func (s *testListEntriesStream) Context() context.Context { return context.Background() }
func (s *testListEntriesStream) SendMsg(m interface{}) error { return nil }
func (s *testListEntriesStream) RecvMsg(m interface{}) error { return nil }
func (s *testListEntriesStream) CloseSend() error { return nil }
type testFilerClient struct {
filer_pb.SeaweedFilerClient
entriesByDir map[string][]*filer_pb.Entry
}
func (c *testFilerClient) ListEntries(ctx context.Context, in *filer_pb.ListEntriesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) {
entries := c.entriesByDir[in.Directory]
// Simplified mock: implements basic prefix filtering but ignores Limit, StartFromFileName, and InclusiveStartFrom
// to keep test logic focused. Prefix "/" is treated as no filter for bucket root compatibility.
if in.Prefix != "" && in.Prefix != "/" {
filtered := make([]*filer_pb.Entry, 0)
for _, e := range entries {
if strings.HasPrefix(e.Name, in.Prefix) {
filtered = append(filtered, e)
}
}
entries = filtered
}
// Respect Limit
if in.Limit > 0 && int(in.Limit) < len(entries) {
entries = entries[:in.Limit]
}
return &testListEntriesStream{entries: entries}, nil
}
type markerEchoFilerClient struct {
filer_pb.SeaweedFilerClient
entriesByDir map[string][]*filer_pb.Entry
returnFollowing bool
}
// markerEchoFilerClient intentionally ignores request Limit/InclusiveStartFrom
// and simulates a backend that may echo StartFromFileName. entriesByDir controls
// returned entries; returnFollowing controls whether ListEntries returns only the
// echoed marker or the echoed marker plus following entries.
func (c *markerEchoFilerClient) ListEntries(ctx context.Context, in *filer_pb.ListEntriesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) {
entries := c.entriesByDir[in.Directory]
ensureEntryAttributes(entries)
if in.StartFromFileName == "" {
return &testListEntriesStream{entries: entries}, nil
}
for i, e := range entries {
if e.Name == in.StartFromFileName {
// Emulate buggy backend behavior: return marker again even when exclusive.
if c.returnFollowing {
echoAndFollowing := entries[i:]
return &testListEntriesStream{entries: echoAndFollowing}, nil
}
return &testListEntriesStream{entries: []*filer_pb.Entry{e}}, nil
}
}
return &testListEntriesStream{entries: nil}, nil
}
func ensureEntryAttributes(entries []*filer_pb.Entry) {
for _, entry := range entries {
if entry == nil {
continue
}
if entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
}
}
func TestListObjectsHandler(t *testing.T) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
expected := `<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult><Name>test_container</Name><Prefix></Prefix><Marker></Marker><MaxKeys>1000</MaxKeys><IsTruncated>false</IsTruncated><Contents><Key>1.zip</Key><ETag>&#34;4397da7a7649e8085de9916c240e8166&#34;</ETag><Size>1234567</Size><Owner><ID>65a011niqo39cdf8ec533ec3d1ccaafsa932</ID></Owner><StorageClass>STANDARD</StorageClass><LastModified>2011-04-09T12:34:49Z</LastModified></Contents><EncodingType></EncodingType></ListBucketResult>`
response := ListBucketResult{
Name: "test_container",
Prefix: "",
Marker: "",
NextMarker: "",
MaxKeys: 1000,
IsTruncated: false,
Contents: []ListEntry{{
Key: "1.zip",
LastModified: time.Date(2011, 4, 9, 12, 34, 49, 0, time.UTC),
ETag: "\"4397da7a7649e8085de9916c240e8166\"",
Size: 1234567,
Owner: &CanonicalUser{
ID: "65a011niqo39cdf8ec533ec3d1ccaafsa932",
},
StorageClass: "STANDARD",
}},
}
encoded := string(s3err.EncodeXMLResponse(response))
if encoded != expected {
t.Errorf("unexpected output: %s\nexpecting:%s", encoded, expected)
}
}
func TestListObjectsV1NamespaceResponse(t *testing.T) {
response := ListBucketResult{
Name: "test_container",
Prefix: "",
Marker: "",
NextMarker: "",
MaxKeys: 1000,
IsTruncated: false,
}
encoded := string(s3err.EncodeXMLResponse(toListBucketResultV1(response)))
assert.Contains(t, encoded, `<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">`)
}
func Test_normalizePrefixMarker(t *testing.T) {
type args struct {
prefix string
marker string
}
tests := []struct {
name string
args args
wantAlignedDir string
wantAlignedPrefix string
wantAlignedMarker string
}{
{"bucket root listing with delimiter",
args{"/",
""},
"",
"",
"",
},
{"prefix is a directory",
args{"/parentDir/data/",
""},
"parentDir",
"data",
"",
},
{"normal case",
args{"/parentDir/data/0",
"parentDir/data/0e/0e149049a2137b0cc12e"},
"parentDir/data",
"0",
"0e/0e149049a2137b0cc12e",
},
{"empty prefix",
args{"",
"parentDir/data/0e/0e149049a2137b0cc12e"},
"",
"",
"parentDir/data/0e/0e149049a2137b0cc12e",
},
{"empty directory",
args{"parent",
"parentDir/data/0e/0e149049a2137b0cc12e"},
"",
"parent",
"parentDir/data/0e/0e149049a2137b0cc12e",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotAlignedDir, gotAlignedPrefix, gotAlignedMarker := normalizePrefixMarker(tt.args.prefix, tt.args.marker)
assert.Equalf(t, tt.wantAlignedDir, gotAlignedDir, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker)
assert.Equalf(t, tt.wantAlignedPrefix, gotAlignedPrefix, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker)
assert.Equalf(t, tt.wantAlignedMarker, gotAlignedMarker, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker)
})
}
}
func TestBuildTruncatedNextMarker(t *testing.T) {
t.Run("does not duplicate prefix segment in next continuation token", func(t *testing.T) {
prefix := "export_2026-02-10_17-00-23"
nextMarker := "export_2026-02-10_17-00-23/4156000e.jpg"
actual := buildTruncatedNextMarker("xemu", prefix, nextMarker, false, "")
assert.Equal(t, "xemu/export_2026-02-10_17-00-23/4156000e.jpg", actual)
})
t.Run("keeps common prefix marker trailing slash", func(t *testing.T) {
actual := buildTruncatedNextMarker("xemu", "export_2026-02-10_17-00-23", "", true, "nested")
assert.Equal(t, "xemu/export_2026-02-10_17-00-23/nested/", actual)
})
t.Run("includes prefix for common prefix marker when request dir is empty", func(t *testing.T) {
actual := buildTruncatedNextMarker("", "foo", "", true, "bar")
assert.Equal(t, "foo/bar/", actual)
})
}
func TestAllowUnorderedParameterValidation(t *testing.T) {
// Test getListObjectsV1Args with allow-unordered parameter
t.Run("getListObjectsV1Args with allow-unordered", func(t *testing.T) {
// Test with allow-unordered=true
values := map[string][]string{
"allow-unordered": {"true"},
"delimiter": {"/"},
}
_, _, _, _, _, allowUnordered, errCode := getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.True(t, allowUnordered, "allow-unordered should be true when set to 'true'")
// Test with allow-unordered=false
values = map[string][]string{
"allow-unordered": {"false"},
}
_, _, _, _, _, allowUnordered, errCode = getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.False(t, allowUnordered, "allow-unordered should be false when set to 'false'")
// Test without allow-unordered parameter
values = map[string][]string{}
_, _, _, _, _, allowUnordered, errCode = getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.False(t, allowUnordered, "allow-unordered should be false when not set")
})
// Test getListObjectsV2Args with allow-unordered parameter
t.Run("getListObjectsV2Args with allow-unordered", func(t *testing.T) {
// Test with allow-unordered=true
values := map[string][]string{
"allow-unordered": {"true"},
"delimiter": {"/"},
}
_, _, _, _, _, _, _, allowUnordered, errCode := getListObjectsV2Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.True(t, allowUnordered, "allow-unordered should be true when set to 'true'")
// Test with allow-unordered=false
values = map[string][]string{
"allow-unordered": {"false"},
}
_, _, _, _, _, _, _, allowUnordered, errCode = getListObjectsV2Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.False(t, allowUnordered, "allow-unordered should be false when set to 'false'")
// Test without allow-unordered parameter
values = map[string][]string{}
_, _, _, _, _, _, _, allowUnordered, errCode = getListObjectsV2Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.False(t, allowUnordered, "allow-unordered should be false when not set")
})
}
func TestDoListFilerEntries_BucketRootPrefixSlashDelimiterSlash_ListsDirectories(t *testing.T) {
// Regression test for a bug where doListFilerEntries returned early when
// prefix == "/" && delimiter == "/", causing bucket-root folder listings
// (e.g. Veeam v13) to return empty results.
s3a := &S3ApiServer{}
client := &testFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/test-bucket": {
{Name: "Veeam", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
},
}
cursor := &ListingCursor{maxKeys: 1000}
seen := make([]string, 0)
_, err := s3a.doListFilerEntries(client, "/buckets/test-bucket", "/", cursor, "", "/", false, "test-bucket", func(dir string, entry *filer_pb.Entry) {
if entry.IsDirectory {
seen = append(seen, entry.Name)
}
})
assert.NoError(t, err)
assert.Contains(t, seen, "Veeam")
}
func TestDoListFilerEntries_ExclusiveStartSkipsMarkerEcho(t *testing.T) {
s3a := &S3ApiServer{}
client := &markerEchoFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/test-bucket": {
{Name: "file.txt", Attributes: &filer_pb.FuseAttributes{}},
{Name: "test.txt", Attributes: &filer_pb.FuseAttributes{}},
},
},
}
cursor := &ListingCursor{maxKeys: 1000}
var seen []string
nextMarker, err := s3a.doListFilerEntries(client, "/buckets/test-bucket", "", cursor, "test.txt", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) {
seen = append(seen, entry.Name)
})
assert.NoError(t, err)
assert.Empty(t, seen, "marker entry should not be returned in exclusive mode")
assert.Equal(t, "", nextMarker, "next marker should be empty when only marker echo is returned")
}
func TestDoListFilerEntries_ExclusiveStartSkipsMarkerEchoWithSubsequentEntries(t *testing.T) {
s3a := &S3ApiServer{}
client := &markerEchoFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/test-bucket": {
{Name: "file.txt", Attributes: &filer_pb.FuseAttributes{}},
{Name: "test.txt", Attributes: &filer_pb.FuseAttributes{}},
{Name: "zebra.txt", Attributes: &filer_pb.FuseAttributes{}},
},
},
returnFollowing: true,
}
cursor := &ListingCursor{maxKeys: 1000}
var seen []string
nextMarker, err := s3a.doListFilerEntries(client, "/buckets/test-bucket", "", cursor, "test.txt", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) {
seen = append(seen, entry.Name)
})
assert.NoError(t, err)
assert.Equal(t, []string{"zebra.txt"}, seen, "marker should be skipped while subsequent entries are returned")
assert.Equal(t, "zebra.txt", nextMarker)
}
func TestAllowUnorderedWithDelimiterValidation(t *testing.T) {
t.Run("should return error when allow-unordered=true and delimiter are both present", func(t *testing.T) {
// Create a request with both allow-unordered=true and delimiter
req := httptest.NewRequest("GET", "/bucket?allow-unordered=true&delimiter=/", nil)
// Extract query parameters like the handler would
values := req.URL.Query()
// Test ListObjectsV1Args
_, _, delimiter, _, _, allowUnordered, errCode := getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.True(t, allowUnordered, "allow-unordered should be true")
assert.Equal(t, "/", delimiter, "delimiter should be '/'")
// The validation should catch this combination
if allowUnordered && delimiter != "" {
assert.True(t, true, "Validation correctly detected invalid combination")
} else {
assert.Fail(t, "Validation should have detected invalid combination")
}
// Test ListObjectsV2Args
_, _, delimiter2, _, _, _, _, allowUnordered2, errCode2 := getListObjectsV2Args(values)
assert.Equal(t, s3err.ErrNone, errCode2, "should not return error for valid parameters")
assert.True(t, allowUnordered2, "allow-unordered should be true")
assert.Equal(t, "/", delimiter2, "delimiter should be '/'")
// The validation should catch this combination
if allowUnordered2 && delimiter2 != "" {
assert.True(t, true, "Validation correctly detected invalid combination")
} else {
assert.Fail(t, "Validation should have detected invalid combination")
}
})
t.Run("should allow allow-unordered=true without delimiter", func(t *testing.T) {
// Create a request with only allow-unordered=true
req := httptest.NewRequest("GET", "/bucket?allow-unordered=true", nil)
values := req.URL.Query()
// Test ListObjectsV1Args
_, _, delimiter, _, _, allowUnordered, errCode := getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.True(t, allowUnordered, "allow-unordered should be true")
assert.Equal(t, "", delimiter, "delimiter should be empty")
// This combination should be valid
if allowUnordered && delimiter != "" {
assert.Fail(t, "This should be a valid combination")
} else {
assert.True(t, true, "Valid combination correctly allowed")
}
})
t.Run("should allow delimiter without allow-unordered", func(t *testing.T) {
// Create a request with only delimiter
req := httptest.NewRequest("GET", "/bucket?delimiter=/", nil)
values := req.URL.Query()
// Test ListObjectsV1Args
_, _, delimiter, _, _, allowUnordered, errCode := getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "should not return error for valid parameters")
assert.False(t, allowUnordered, "allow-unordered should be false")
assert.Equal(t, "/", delimiter, "delimiter should be '/'")
// This combination should be valid
if allowUnordered && delimiter != "" {
assert.Fail(t, "This should be a valid combination")
} else {
assert.True(t, true, "Valid combination correctly allowed")
}
})
}
func TestSanitizeV1MarkerEcho_NoProgressGuard(t *testing.T) {
response := ListBucketResult{
Marker: "test.txt",
NextMarker: "test.txt",
IsTruncated: true,
Contents: []ListEntry{
{Key: "test.txt"},
},
}
sanitizeV1MarkerEcho(&response, "test.txt", false)
assert.Empty(t, response.Contents)
assert.Equal(t, "", response.NextMarker)
assert.False(t, response.IsTruncated)
response2 := ListBucketResult{
Marker: "test file.txt",
NextMarker: "test%20file.txt",
IsTruncated: true,
Contents: []ListEntry{
{Key: "test%20file.txt"},
},
}
sanitizeV1MarkerEcho(&response2, "test file.txt", true)
assert.Empty(t, response2.Contents)
assert.Equal(t, "", response2.NextMarker)
assert.False(t, response2.IsTruncated)
}
// TestMaxKeysParameterValidation tests the validation of max-keys parameter
func TestMaxKeysParameterValidation(t *testing.T) {
t.Run("valid max-keys values should work", func(t *testing.T) {
// Test valid numeric values
values := map[string][]string{
"max-keys": {"100"},
}
_, _, _, _, _, _, errCode := getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "valid max-keys should not return error")
_, _, _, _, _, _, _, _, errCode = getListObjectsV2Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "valid max-keys should not return error")
})
t.Run("invalid max-keys values should return error", func(t *testing.T) {
// Test non-numeric value
values := map[string][]string{
"max-keys": {"blah"},
}
_, _, _, _, _, _, errCode := getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrInvalidMaxKeys, errCode, "non-numeric max-keys should return ErrInvalidMaxKeys")
_, _, _, _, _, _, _, _, errCode = getListObjectsV2Args(values)
assert.Equal(t, s3err.ErrInvalidMaxKeys, errCode, "non-numeric max-keys should return ErrInvalidMaxKeys")
})
t.Run("empty max-keys should use default", func(t *testing.T) {
// Test empty max-keys
values := map[string][]string{}
_, _, _, _, maxkeys, _, errCode := getListObjectsV1Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "empty max-keys should not return error")
assert.Equal(t, int16(1000), maxkeys, "empty max-keys should use default value")
_, _, _, _, _, _, maxkeys2, _, errCode := getListObjectsV2Args(values)
assert.Equal(t, s3err.ErrNone, errCode, "empty max-keys should not return error")
assert.Equal(t, uint16(1000), maxkeys2, "empty max-keys should use default value")
})
}
// TestDelimiterWithDirectoryKeyObjects tests that directory key objects (like "0/") are properly
// grouped into common prefixes when using delimiters, matching AWS S3 behavior.
//
// This test addresses the issue found in test_bucket_list_delimiter_not_skip_special where
// directory key objects were incorrectly returned as individual keys instead of being
// grouped into common prefixes when a delimiter was specified.
func TestDelimiterWithDirectoryKeyObjects(t *testing.T) {
// This test simulates the failing test scenario:
// Objects: ['0/'] + ['0/1000', '0/1001', ..., '0/1998'] + ['1999', '1999#', '1999+', '2000']
// With delimiter='/', expect:
// - Keys: ['1999', '1999#', '1999+', '2000']
// - CommonPrefixes: ['0/']
t.Run("directory key object should be grouped into common prefix with delimiter", func(t *testing.T) {
// The fix ensures that when a delimiter is specified, directory key objects
// (entries that are both directories AND have MIME types set) undergo the same
// delimiter-based grouping logic as regular files.
// Before fix: '0/' would be returned as an individual key
// After fix: '0/' is grouped with '0/xxxx' objects into common prefix '0/'
// This matches AWS S3 behavior where all objects sharing a prefix up to the
// delimiter are grouped together, regardless of whether they are directory key objects.
assert.True(t, true, "Directory key objects should be grouped into common prefixes when delimiter is used")
})
t.Run("directory key object without delimiter should be individual key", func(t *testing.T) {
// When no delimiter is specified, directory key objects should still be
// returned as individual keys (existing behavior maintained).
assert.True(t, true, "Directory key objects should be individual keys when no delimiter is used")
})
}
// TestObjectLevelListPermissions tests that object-level List permissions work correctly
func TestObjectLevelListPermissions(t *testing.T) {
// Test the core functionality that was fixed for issue #7039
t.Run("Identity CanDo Object Level Permissions", func(t *testing.T) {
// Create identity with object-level List permission
identity := &Identity{
Name: "test-user",
Actions: []Action{
"List:test-bucket/allowed-prefix/*",
},
}
// Test cases for CanDo method
// Note: CanDo concatenates bucket + objectKey, so "test-bucket" + "/allowed-prefix/file.txt" = "test-bucket/allowed-prefix/file.txt"
testCases := []struct {
name string
action Action
bucket string
object string
shouldAllow bool
description string
}{
{
name: "allowed prefix exact match",
action: "List",
bucket: "test-bucket",
object: "/allowed-prefix/file.txt",
shouldAllow: true,
description: "Should allow access to objects under the allowed prefix",
},
{
name: "allowed prefix subdirectory",
action: "List",
bucket: "test-bucket",
object: "/allowed-prefix/subdir/file.txt",
shouldAllow: true,
description: "Should allow access to objects in subdirectories under the allowed prefix",
},
{
name: "denied different prefix",
action: "List",
bucket: "test-bucket",
object: "/other-prefix/file.txt",
shouldAllow: false,
description: "Should deny access to objects under a different prefix",
},
{
name: "denied different bucket",
action: "List",
bucket: "other-bucket",
object: "/allowed-prefix/file.txt",
shouldAllow: false,
description: "Should deny access to objects in a different bucket",
},
{
name: "denied root level",
action: "List",
bucket: "test-bucket",
object: "/file.txt",
shouldAllow: false,
description: "Should deny access to root-level objects when permission is prefix-specific",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := identity.CanDo(tc.action, tc.bucket, tc.object)
assert.Equal(t, tc.shouldAllow, result, tc.description)
})
}
})
t.Run("Bucket Level Permissions Still Work", func(t *testing.T) {
// Create identity with bucket-level List permission
identity := &Identity{
Name: "bucket-user",
Actions: []Action{
"List:test-bucket",
},
}
// Should allow access to any object in the bucket
testCases := []struct {
object string
}{
{"/file.txt"},
{"/prefix/file.txt"},
{"/deep/nested/path/file.txt"},
}
for _, tc := range testCases {
result := identity.CanDo("List", "test-bucket", tc.object)
assert.True(t, result, "Bucket-level permission should allow access to %s", tc.object)
}
// Should deny access to different buckets
result := identity.CanDo("List", "other-bucket", "/file.txt")
assert.False(t, result, "Should deny access to objects in different buckets")
})
t.Run("Empty Object With Prefix Logic", func(t *testing.T) {
// Test the middleware logic fix: when object is empty but prefix is provided,
// the object should be set to the prefix value for permission checking
// This simulates the fixed logic in auth_credentials.go:
// if (object == "/" || object == "") && prefix != "" {
// object = prefix
// }
testCases := []struct {
name string
object string
prefix string
expected string
}{
{
name: "empty object with prefix",
object: "",
prefix: "/allowed-prefix/",
expected: "/allowed-prefix/",
},
{
name: "slash object with prefix",
object: "/",
prefix: "/allowed-prefix/",
expected: "/allowed-prefix/",
},
{
name: "object already set",
object: "/existing-object",
prefix: "/some-prefix/",
expected: "/existing-object",
},
{
name: "no prefix provided",
object: "",
prefix: "",
expected: "",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Simulate the middleware logic
object := tc.object
prefix := tc.prefix
if (object == "/" || object == "") && prefix != "" {
object = prefix
}
assert.Equal(t, tc.expected, object, "Object should be correctly set based on prefix")
})
}
})
t.Run("Issue 7039 Scenario", func(t *testing.T) {
// Test the exact scenario from the GitHub issue
// User has permission: "List:bdaai-shared-bucket/txzl/*"
// They make request: GET /bdaai-shared-bucket?prefix=txzl/
identity := &Identity{
Name: "issue-user",
Actions: []Action{
"List:bdaai-shared-bucket/txzl/*",
},
}
// For a list request like "GET /bdaai-shared-bucket?prefix=txzl/":
// - bucket = "bdaai-shared-bucket"
// - object = "" (no object in URL path)
// - prefix = "/txzl/" (from query parameter)
// After our middleware fix, it should check permission for the prefix
// Simulate: action=ACTION_LIST && object=="" && prefix="/txzl/" → object="/txzl/"
result := identity.CanDo("List", "bdaai-shared-bucket", "/txzl/")
// This should be allowed because:
// target = "List:bdaai-shared-bucket/txzl/"
// permission = "List:bdaai-shared-bucket/txzl/*"
// wildcard match: "List:bdaai-shared-bucket/txzl/" starts with "List:bdaai-shared-bucket/txzl/"
assert.True(t, result, "User with 'List:bdaai-shared-bucket/txzl/*' should be able to list with prefix txzl/")
// Test that they can't list with a different prefix
result = identity.CanDo("List", "bdaai-shared-bucket", "/other-prefix/")
assert.False(t, result, "User should not be able to list with a different prefix")
// Test that they can't list a different bucket
result = identity.CanDo("List", "other-bucket", "/txzl/")
assert.False(t, result, "User should not be able to list a different bucket")
})
t.Log("This test validates the fix for issue #7039")
t.Log("Object-level List permissions like 'List:bucket/prefix/*' now work correctly")
t.Log("Middleware properly extracts prefix for permission validation")
}
func TestListObjectsV2_Regression(t *testing.T) {
// Reproduce issue: ListObjectsV2 without delimiter returns 0 objects even though files exist
// Structure: s3://reports/reports/[timestamp]/file
// Request: ListObjectsV2(Bucket='reports', Prefix='reports/')
s3a := &S3ApiServer{}
client := &testFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/reports": {
{Name: "reports", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports": {
{Name: "01771152617961894200", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports/01771152617961894200": {
{Name: "file1", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
},
}
// s3.list_objects_v2(Bucket='reports', Prefix='reports/')
// normalized: requestDir="", prefix="reports"
// doListFilerEntries called with dir="/buckets/reports", prefix="reports", delimiter=""
cursor := &ListingCursor{maxKeys: 1000, prefixEndsOnDelimiter: true} // set based on "reports/" original prefix
var results []string
// Call doListFilerEntries directly to unit test listing logic in isolation,
// simulating parameters passed from listFilerEntries for prefix "reports/".
_, err := s3a.doListFilerEntries(client, "/buckets/reports", "reports", cursor, "", "", false, "reports", func(dir string, entry *filer_pb.Entry) {
if !entry.IsDirectory {
results = append(results, entry.Name)
}
})
assert.NoError(t, err)
assert.Contains(t, results, "file1", "Should return the nested file")
}
func TestListObjectsV2_Regression_Sorting(t *testing.T) {
// Verify that listing logic correctly finds the target directory even when
// other entries with a similar prefix are returned first by the filer,
// a scenario where the removed Limit=1 optimization would fail.
s3a := &S3ApiServer{}
client := &testFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/reports": {
{Name: "reports-archive", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
{Name: "reports", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports": {
{Name: "01771152617961894200", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports/01771152617961894200": {
{Name: "file1", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
},
}
// This cursor setup mimics what happens in listFilerEntries
cursor := &ListingCursor{maxKeys: 1000, prefixEndsOnDelimiter: true}
var results []string
// Without the fix, Limit=1 would cause the lister to stop after "reports-archive",
// missing the intended "reports" directory.
_, err := s3a.doListFilerEntries(client, "/buckets/reports", "reports", cursor, "", "", false, "reports", func(dir string, entry *filer_pb.Entry) {
if !entry.IsDirectory {
results = append(results, entry.Name)
}
})
assert.NoError(t, err)
// With Limit=1, this fails because it only sees "reports-archive"
// With fix, it sees both and processes "reports"
assert.Contains(t, results, "file1", "Should return the nested file even if 'reports' directory is not the first match")
}
func TestListObjectsV2_PrefixEndingWithSlash_DoesNotMatchSiblings(t *testing.T) {
// Regression test: listing with prefix "1/" should only return objects under
// directory "1", not objects under "1000" or any other sibling whose name
// shares the same prefix string.
s3a := &S3ApiServer{}
client := &testFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/bucket/path/to/list": {
{Name: "1", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
{Name: "1000", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
{Name: "2500", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/bucket/path/to/list/1": {
{Name: "fileA", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/bucket/path/to/list/1000": {
{Name: "fileB", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
{Name: "fileC", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/bucket/path/to/list/2500": {
{Name: "fileD", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
},
}
// Simulate listing with prefix "path/to/list/1/" (no delimiter).
// normalizePrefixMarker("path/to/list/1/", "") returns dir="path/to/list", prefix="1"
cursor := &ListingCursor{maxKeys: 1000, prefixEndsOnDelimiter: true}
var results []string
_, err := s3a.doListFilerEntries(client, "/buckets/bucket/path/to/list", "1", cursor, "", "", false, "bucket", func(dir string, entry *filer_pb.Entry) {
if !entry.IsDirectory {
results = append(results, entry.Name)
}
})
assert.NoError(t, err)
assert.Equal(t, []string{"fileA"}, results, "Should only return files under directory '1', not '1000' or '2500'")
}
func TestListObjectsV2_PrefixEndingWithSlash_WithDelimiter(t *testing.T) {
// Same scenario but with delimiter="/", verifying the fix works for both cases.
s3a := &S3ApiServer{}
client := &testFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/bucket/path/to/list": {
{Name: "1", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
{Name: "1000", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/bucket/path/to/list/1": {
{Name: "fileA", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/bucket/path/to/list/1000": {
{Name: "fileB", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
},
}
cursor := &ListingCursor{maxKeys: 1000, prefixEndsOnDelimiter: true}
var results []string
_, err := s3a.doListFilerEntries(client, "/buckets/bucket/path/to/list", "1", cursor, "", "/", false, "bucket", func(dir string, entry *filer_pb.Entry) {
results = append(results, entry.Name)
})
assert.NoError(t, err)
assert.Equal(t, []string{"fileA"}, results, "Should only return files under directory '1', not '1000'")
}