s3tables: fix shared table-location bucket mapping collisions (#8286)
* s3tables: prevent shared table-location bucket mapping overwrite * Update weed/s3api/bucket_paths.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
@@ -49,7 +49,7 @@ type BucketRegistry struct {
|
||||
metadataCache map[string]*BucketMetaData
|
||||
metadataCacheLock sync.RWMutex
|
||||
|
||||
tableLocationCache map[string]string // Cache for table location mappings (bucket -> table path)
|
||||
tableLocationCache map[string]string // Cache for table location mappings (external bucket -> mapped root path)
|
||||
tableLocationLock sync.RWMutex
|
||||
|
||||
notFound map[string]struct{}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
@@ -58,8 +60,13 @@ func (s3a *S3ApiServer) tableLocationDir(bucket string) (string, bool) {
|
||||
|
||||
entry, err := s3a.getEntry(s3tables.GetTableLocationMappingDir(), bucket)
|
||||
tablePath := ""
|
||||
if err == nil && entry != nil && len(entry.Content) > 0 {
|
||||
tablePath = strings.TrimSpace(string(entry.Content))
|
||||
if err == nil && entry != nil {
|
||||
if entry.IsDirectory {
|
||||
tablePath, err = s3a.readTableLocationMappingFromDirectory(bucket)
|
||||
} else if len(entry.Content) > 0 {
|
||||
// Backward compatibility with legacy single-file mappings.
|
||||
tablePath = normalizeTableLocationMappingPath(string(entry.Content))
|
||||
}
|
||||
}
|
||||
|
||||
// Only cache definitive results: successful lookup (tablePath set) or definitive not-found (ErrNotFound)
|
||||
@@ -82,6 +89,77 @@ func (s3a *S3ApiServer) tableLocationDir(bucket string) (string, bool) {
|
||||
return tablePath, true
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) readTableLocationMappingFromDirectory(bucket string) (string, error) {
|
||||
mappingDir := s3tables.GetTableLocationMappingPath(bucket)
|
||||
var mappedPath string
|
||||
conflict := false
|
||||
|
||||
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||
Directory: mappingDir,
|
||||
Limit: 4294967295, // math.MaxUint32
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if recvErr != nil {
|
||||
return recvErr
|
||||
}
|
||||
if resp == nil || resp.Entry == nil || resp.Entry.IsDirectory || len(resp.Entry.Content) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
candidate := normalizeTableLocationMappingPath(string(resp.Entry.Content))
|
||||
if candidate == "" {
|
||||
continue
|
||||
}
|
||||
if mappedPath == "" {
|
||||
mappedPath = candidate
|
||||
continue
|
||||
}
|
||||
if mappedPath != candidate {
|
||||
conflict = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if conflict {
|
||||
glog.V(1).Infof("table location mapping conflict for %s: multiple mapped roots found", bucket)
|
||||
return "", nil
|
||||
}
|
||||
return mappedPath, nil
|
||||
}
|
||||
|
||||
func normalizeTableLocationMappingPath(rawPath string) string {
|
||||
rawPath = strings.TrimSpace(rawPath)
|
||||
if rawPath == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
normalized := path.Clean("/" + strings.TrimPrefix(rawPath, "/"))
|
||||
tablesPrefix := strings.TrimSuffix(s3tables.TablesPath, "/") + "/"
|
||||
if !strings.HasPrefix(normalized, tablesPrefix) {
|
||||
return normalized
|
||||
}
|
||||
|
||||
remaining := strings.TrimPrefix(normalized, tablesPrefix)
|
||||
bucketName, _, _ := strings.Cut(remaining, "/")
|
||||
if bucketName == "" {
|
||||
return ""
|
||||
}
|
||||
return path.Join(s3tables.TablesPath, bucketName)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) bucketRoot(bucket string) string {
|
||||
// Returns the unified buckets root path for all bucket types
|
||||
return s3a.option.BucketsPath
|
||||
|
||||
45
weed/s3api/bucket_paths_test.go
Normal file
45
weed/s3api/bucket_paths_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package s3api
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestNormalizeTableLocationMappingPath(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
raw string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "legacy table path maps to bucket root",
|
||||
raw: "/buckets/warehouse/analytics/orders",
|
||||
want: "/buckets/warehouse",
|
||||
},
|
||||
{
|
||||
name: "already bucket root",
|
||||
raw: "/buckets/warehouse",
|
||||
want: "/buckets/warehouse",
|
||||
},
|
||||
{
|
||||
name: "relative buckets path normalized and reduced",
|
||||
raw: "buckets/warehouse/analytics/orders",
|
||||
want: "/buckets/warehouse",
|
||||
},
|
||||
{
|
||||
name: "non buckets path preserved",
|
||||
raw: "/tmp/custom/path",
|
||||
want: "/tmp/custom/path",
|
||||
},
|
||||
{
|
||||
name: "empty path",
|
||||
raw: "",
|
||||
want: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := normalizeTableLocationMappingPath(tc.raw); got != tc.want {
|
||||
t.Fatalf("normalizeTableLocationMappingPath(%q)=%q, want %q", tc.raw, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -929,7 +929,7 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque
|
||||
if err := h.deleteDirectory(r.Context(), client, tablePath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.deleteTableLocationMapping(r.Context(), client, metadata.MetadataLocation); err != nil {
|
||||
if err := h.deleteTableLocationMapping(r.Context(), client, metadata.MetadataLocation, tablePath); err != nil {
|
||||
glog.V(1).Infof("failed to delete table location mapping for %s: %v", metadata.MetadataLocation, err)
|
||||
}
|
||||
return nil
|
||||
@@ -1139,29 +1139,98 @@ func (h *S3TablesHandler) updateTableLocationMapping(ctx context.Context, client
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
tableBucketPath, ok := tableBucketPathFromTablePath(tablePath)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid table path for location mapping: %s", tablePath)
|
||||
}
|
||||
|
||||
if err := h.ensureDirectory(ctx, client, GetTableLocationMappingDir()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.ensureTableLocationMappingBucketDir(ctx, client, newTableLocationBucket); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If the metadata location changed, delete the stale mapping for the old bucket
|
||||
// If the metadata location changed, remove this table's stale mapping entry from the old bucket.
|
||||
if oldMetadataLocation != "" && oldMetadataLocation != newMetadataLocation {
|
||||
oldTableLocationBucket, ok := parseTableLocationBucket(oldMetadataLocation)
|
||||
if ok && oldTableLocationBucket != newTableLocationBucket {
|
||||
oldMappingPath := GetTableLocationMappingPath(oldTableLocationBucket)
|
||||
if err := h.deleteEntryIfExists(ctx, client, oldMappingPath); err != nil {
|
||||
if err := h.removeTableLocationMappingEntry(ctx, client, oldTableLocationBucket, tablePath); err != nil {
|
||||
glog.V(1).Infof("failed to delete stale mapping for %s: %v", oldTableLocationBucket, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return h.upsertFile(ctx, client, GetTableLocationMappingPath(newTableLocationBucket), []byte(tablePath))
|
||||
return h.upsertFile(ctx, client, GetTableLocationMappingEntryPath(newTableLocationBucket, tablePath), []byte(tableBucketPath))
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) deleteTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, metadataLocation string) error {
|
||||
func (h *S3TablesHandler) deleteTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, metadataLocation, tablePath string) error {
|
||||
tableLocationBucket, ok := parseTableLocationBucket(metadataLocation)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return h.deleteEntryIfExists(ctx, client, GetTableLocationMappingPath(tableLocationBucket))
|
||||
return h.removeTableLocationMappingEntry(ctx, client, tableLocationBucket, tablePath)
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) ensureTableLocationMappingBucketDir(ctx context.Context, client filer_pb.SeaweedFilerClient, tableLocationBucket string) error {
|
||||
mappingDir := GetTableLocationMappingDir()
|
||||
bucketMappingPath := GetTableLocationMappingPath(tableLocationBucket)
|
||||
|
||||
resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: mappingDir,
|
||||
Name: tableLocationBucket,
|
||||
})
|
||||
if err == nil {
|
||||
if resp != nil && resp.Entry != nil && resp.Entry.IsDirectory {
|
||||
return nil
|
||||
}
|
||||
if removeErr := h.deleteEntryIfExists(ctx, client, bucketMappingPath); removeErr != nil && !errors.Is(removeErr, filer_pb.ErrNotFound) {
|
||||
return removeErr
|
||||
}
|
||||
} else if !errors.Is(err, filer_pb.ErrNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
return h.ensureDirectory(ctx, client, bucketMappingPath)
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) removeTableLocationMappingEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, tableLocationBucket, tablePath string) error {
|
||||
entryPath := GetTableLocationMappingEntryPath(tableLocationBucket, tablePath)
|
||||
if err := h.deleteEntryIfExists(ctx, client, entryPath); err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
|
||||
return err
|
||||
}
|
||||
return h.removeTableLocationMappingBucketDirIfEmpty(ctx, client, tableLocationBucket)
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) removeTableLocationMappingBucketDirIfEmpty(ctx context.Context, client filer_pb.SeaweedFilerClient, tableLocationBucket string) error {
|
||||
bucketMappingPath := GetTableLocationMappingPath(tableLocationBucket)
|
||||
|
||||
stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
||||
Directory: bucketMappingPath,
|
||||
Limit: 1,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, filer_pb.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr == io.EOF {
|
||||
break
|
||||
}
|
||||
if recvErr != nil {
|
||||
return recvErr
|
||||
}
|
||||
if resp != nil && resp.Entry != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.deleteEntryIfExists(ctx, client, bucketMappingPath); err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
77
weed/s3api/s3tables/table_location_mapping_test.go
Normal file
77
weed/s3api/s3tables/table_location_mapping_test.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package s3tables
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGetTableLocationMappingEntryPathPerTable(t *testing.T) {
|
||||
tableLocationBucket := "shared-location--table-s3"
|
||||
tablePathA := GetTablePath("warehouse", "analytics", "orders")
|
||||
tablePathB := GetTablePath("warehouse", "analytics", "customers")
|
||||
|
||||
entryPathA := GetTableLocationMappingEntryPath(tableLocationBucket, tablePathA)
|
||||
entryPathARepeat := GetTableLocationMappingEntryPath(tableLocationBucket, tablePathA)
|
||||
entryPathB := GetTableLocationMappingEntryPath(tableLocationBucket, tablePathB)
|
||||
|
||||
if entryPathA != entryPathARepeat {
|
||||
t.Fatalf("mapping entry path should be deterministic: %q != %q", entryPathA, entryPathARepeat)
|
||||
}
|
||||
if entryPathA == entryPathB {
|
||||
t.Fatalf("mapping entry path should differ per table path: %q == %q", entryPathA, entryPathB)
|
||||
}
|
||||
|
||||
expectedPrefix := GetTableLocationMappingPath(tableLocationBucket) + "/"
|
||||
if !strings.HasPrefix(entryPathA, expectedPrefix) {
|
||||
t.Fatalf("mapping entry path %q should start with %q", entryPathA, expectedPrefix)
|
||||
}
|
||||
if strings.TrimPrefix(entryPathA, expectedPrefix) == "" {
|
||||
t.Fatalf("mapping entry name should not be empty: %q", entryPathA)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTableBucketPathFromTablePath(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
tablePath string
|
||||
expected string
|
||||
ok bool
|
||||
}{
|
||||
{
|
||||
name: "valid table path",
|
||||
tablePath: GetTablePath("warehouse", "analytics", "orders"),
|
||||
expected: GetTableBucketPath("warehouse"),
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "valid table bucket root",
|
||||
tablePath: GetTableBucketPath("warehouse"),
|
||||
expected: GetTableBucketPath("warehouse"),
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "invalid non-tables path",
|
||||
tablePath: "/tmp/warehouse/analytics/orders",
|
||||
expected: "",
|
||||
ok: false,
|
||||
},
|
||||
{
|
||||
name: "invalid empty bucket segment",
|
||||
tablePath: "/buckets/",
|
||||
expected: "",
|
||||
ok: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actual, ok := tableBucketPathFromTablePath(tc.tablePath)
|
||||
if ok != tc.ok {
|
||||
t.Fatalf("tableBucketPathFromTablePath(%q) ok=%v, want %v", tc.tablePath, ok, tc.ok)
|
||||
}
|
||||
if actual != tc.expected {
|
||||
t.Fatalf("tableBucketPathFromTablePath(%q)=%q, want %q", tc.tablePath, actual, tc.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package s3tables
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/url"
|
||||
@@ -118,6 +119,33 @@ func GetTableLocationMappingPath(tableLocationBucket string) string {
|
||||
return path.Join(GetTableLocationMappingDir(), tableLocationBucket)
|
||||
}
|
||||
|
||||
// GetTableLocationMappingEntryPath returns the filer path for a table-specific mapping entry.
|
||||
// Each table gets its own entry so multiple tables can share the same external table-location bucket.
|
||||
func GetTableLocationMappingEntryPath(tableLocationBucket, tablePath string) string {
|
||||
return path.Join(GetTableLocationMappingPath(tableLocationBucket), tableLocationMappingEntryName(tablePath))
|
||||
}
|
||||
|
||||
func tableLocationMappingEntryName(tablePath string) string {
|
||||
normalized := path.Clean("/" + strings.TrimSpace(strings.TrimPrefix(tablePath, "/")))
|
||||
sum := sha1.Sum([]byte(normalized))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
func tableBucketPathFromTablePath(tablePath string) (string, bool) {
|
||||
normalized := path.Clean("/" + strings.TrimSpace(strings.TrimPrefix(tablePath, "/")))
|
||||
tablesPrefix := strings.TrimSuffix(TablesPath, "/") + "/"
|
||||
if !strings.HasPrefix(normalized, tablesPrefix) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
remaining := strings.TrimPrefix(normalized, tablesPrefix)
|
||||
bucketName, _, _ := strings.Cut(remaining, "/")
|
||||
if bucketName == "" {
|
||||
return "", false
|
||||
}
|
||||
return path.Join(TablesPath, bucketName), true
|
||||
}
|
||||
|
||||
// Metadata structures
|
||||
|
||||
type tableBucketMetadata struct {
|
||||
|
||||
Reference in New Issue
Block a user