filer: add conditional update preconditions (#8647)
* filer: add conditional update preconditions * iceberg: tighten metadata CAS preconditions
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -35,8 +36,9 @@ import (
|
||||
type fakeFilerServer struct {
|
||||
filer_pb.UnimplementedSeaweedFilerServer
|
||||
|
||||
mu sync.Mutex
|
||||
entries map[string]map[string]*filer_pb.Entry // dir → name → entry
|
||||
mu sync.Mutex
|
||||
entries map[string]map[string]*filer_pb.Entry // dir → name → entry
|
||||
beforeUpdate func(*fakeFilerServer, *filer_pb.UpdateEntryRequest) error
|
||||
|
||||
// Counters for assertions
|
||||
createCalls int
|
||||
@@ -138,9 +140,41 @@ func (f *fakeFilerServer) CreateEntry(_ context.Context, req *filer_pb.CreateEnt
|
||||
func (f *fakeFilerServer) UpdateEntry(_ context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
|
||||
f.mu.Lock()
|
||||
f.updateCalls++
|
||||
beforeUpdate := f.beforeUpdate
|
||||
f.beforeUpdate = nil
|
||||
f.mu.Unlock()
|
||||
|
||||
f.putEntry(req.Directory, req.Entry.Name, req.Entry)
|
||||
if beforeUpdate != nil {
|
||||
if err := beforeUpdate(f, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
dirEntries, ok := f.entries[req.Directory]
|
||||
if !ok {
|
||||
return nil, status.Errorf(codes.NotFound, "entry not found: %s/%s", req.Directory, req.Entry.Name)
|
||||
}
|
||||
current := dirEntries[req.Entry.Name]
|
||||
if current == nil {
|
||||
return nil, status.Errorf(codes.NotFound, "entry not found: %s/%s", req.Directory, req.Entry.Name)
|
||||
}
|
||||
for key, expectedValue := range req.ExpectedExtended {
|
||||
actualValue, ok := current.Extended[key]
|
||||
if ok {
|
||||
if !bytes.Equal(actualValue, expectedValue) {
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(expectedValue) > 0 {
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
|
||||
}
|
||||
}
|
||||
|
||||
dirEntries[req.Entry.Name] = req.Entry
|
||||
return &filer_pb.UpdateEntryResponse{}, nil
|
||||
}
|
||||
|
||||
@@ -281,7 +315,8 @@ func populateTable(t *testing.T, fs *fakeFilerServer, setup tableSetup) table.Me
|
||||
Name: setup.TableName,
|
||||
IsDirectory: true,
|
||||
Extended: map[string][]byte{
|
||||
s3tables.ExtendedKeyMetadata: xattr,
|
||||
s3tables.ExtendedKeyMetadata: xattr,
|
||||
s3tables.ExtendedKeyMetadataVersion: metadataVersionXattr(metadataVersion),
|
||||
},
|
||||
})
|
||||
|
||||
@@ -1486,6 +1521,68 @@ func TestMetadataVersionCAS(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataVersionCASDetectsConcurrentUpdate(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
setup := tableSetup{
|
||||
BucketName: "test-bucket",
|
||||
Namespace: "ns",
|
||||
TableName: "tbl",
|
||||
Snapshots: []table.Snapshot{
|
||||
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"},
|
||||
},
|
||||
}
|
||||
populateTable(t, fs, setup)
|
||||
|
||||
tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath())
|
||||
fs.beforeUpdate = func(f *fakeFilerServer, req *filer_pb.UpdateEntryRequest) error {
|
||||
entry := f.getEntry(path.Dir(tableDir), path.Base(tableDir))
|
||||
if entry == nil {
|
||||
return fmt.Errorf("table entry not found before concurrent update")
|
||||
}
|
||||
|
||||
updatedEntry := cloneEntryForTest(t, entry)
|
||||
var internalMeta map[string]json.RawMessage
|
||||
if err := json.Unmarshal(updatedEntry.Extended[s3tables.ExtendedKeyMetadata], &internalMeta); err != nil {
|
||||
return fmt.Errorf("unmarshal xattr: %w", err)
|
||||
}
|
||||
|
||||
versionJSON, err := json.Marshal(2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal version: %w", err)
|
||||
}
|
||||
internalMeta["metadataVersion"] = versionJSON
|
||||
|
||||
updatedXattr, err := json.Marshal(internalMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal xattr: %w", err)
|
||||
}
|
||||
updatedEntry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr
|
||||
updatedEntry.Extended[s3tables.ExtendedKeyMetadataVersion] = metadataVersionXattr(2)
|
||||
f.putEntry(path.Dir(tableDir), path.Base(tableDir), updatedEntry)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := updateTableMetadataXattr(context.Background(), client, tableDir, 1, []byte(`{}`), "metadata/v2.metadata.json")
|
||||
if err == nil {
|
||||
t.Fatal("expected version conflict error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "metadata version conflict") {
|
||||
t.Fatalf("expected metadata version conflict, got %q", err)
|
||||
}
|
||||
}
|
||||
|
||||
func cloneEntryForTest(t *testing.T, entry *filer_pb.Entry) *filer_pb.Entry {
|
||||
t.Helper()
|
||||
|
||||
cloned, ok := proto.Clone(entry).(*filer_pb.Entry)
|
||||
if !ok {
|
||||
t.Fatal("clone entry: unexpected type")
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Avro manifest content patching for tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -280,9 +281,10 @@ func deleteFilerFile(ctx context.Context, client filer_pb.SeaweedFilerClient, di
|
||||
}
|
||||
|
||||
// updateTableMetadataXattr updates the table entry's metadata xattr with
|
||||
// the new Iceberg metadata. It performs a compare-and-swap: if the stored
|
||||
// metadataVersion does not match expectedVersion, it returns
|
||||
// errMetadataVersionConflict so the caller can retry.
|
||||
// the new Iceberg metadata. It verifies the stored metadataVersion before
|
||||
// writing and passes the previous metadata xattr back to the filer as a
|
||||
// server-side precondition so concurrent writers fail with a retryable
|
||||
// metadata version conflict.
|
||||
// newMetadataLocation is the table-relative path to the new metadata file
|
||||
// (e.g. "metadata/v3.metadata.json").
|
||||
func updateTableMetadataXattr(ctx context.Context, client filer_pb.SeaweedFilerClient, tableDir string, expectedVersion int, newFullMetadata []byte, newMetadataLocation string) error {
|
||||
@@ -311,13 +313,8 @@ func updateTableMetadataXattr(ctx context.Context, client filer_pb.SeaweedFilerC
|
||||
return fmt.Errorf("unmarshal existing xattr: %w", err)
|
||||
}
|
||||
|
||||
// Compare-and-swap: verify the stored metadataVersion matches what we expect.
|
||||
// NOTE: This is a client-side CAS — two workers could both read the same
|
||||
// version, pass this check, and race at UpdateEntry (last-write-wins).
|
||||
// The proper fix is server-side precondition support on UpdateEntryRequest
|
||||
// (e.g. expect-version or If-Match semantics). Until then, commitWithRetry
|
||||
// with exponential backoff mitigates but does not eliminate the race.
|
||||
// Avoid scheduling concurrent maintenance on the same table.
|
||||
// Verify the stored metadataVersion matches what we expect before issuing
|
||||
// the conditional UpdateEntry request below.
|
||||
versionRaw, ok := internalMeta["metadataVersion"]
|
||||
if !ok {
|
||||
return fmt.Errorf("%w: metadataVersion field missing from xattr", errMetadataVersionConflict)
|
||||
@@ -368,17 +365,29 @@ func updateTableMetadataXattr(ctx context.Context, client filer_pb.SeaweedFilerC
|
||||
return fmt.Errorf("marshal updated xattr: %w", err)
|
||||
}
|
||||
|
||||
expectedVersionXattr := resp.Entry.Extended[s3tables.ExtendedKeyMetadataVersion]
|
||||
resp.Entry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr
|
||||
resp.Entry.Extended[s3tables.ExtendedKeyMetadataVersion] = metadataVersionXattr(newVersion)
|
||||
_, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
|
||||
Directory: parentDir,
|
||||
Entry: resp.Entry,
|
||||
ExpectedExtended: map[string][]byte{
|
||||
s3tables.ExtendedKeyMetadataVersion: expectedVersionXattr,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.FailedPrecondition {
|
||||
return fmt.Errorf("%w: table metadata changed during update", errMetadataVersionConflict)
|
||||
}
|
||||
return fmt.Errorf("update table entry: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func metadataVersionXattr(version int) []byte {
|
||||
return []byte(strconv.Itoa(version))
|
||||
}
|
||||
|
||||
// generateIcebergVersionToken produces a random hex token, mirroring the
|
||||
// logic in s3tables.generateVersionToken (which is unexported).
|
||||
func generateIcebergVersionToken() string {
|
||||
|
||||
Reference in New Issue
Block a user