From acea36a181cbdab9d8cc2406db6a290c213561ad Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 12:33:32 -0700 Subject: [PATCH] filer: add conditional update preconditions (#8647) * filer: add conditional update preconditions * iceberg: tighten metadata CAS preconditions --- other/java/client/src/main/proto/filer.proto | 1 + weed/pb/filer.proto | 1 + weed/pb/filer_pb/filer.pb.go | 178 ++++++++++--------- weed/plugin/worker/iceberg/exec_test.go | 105 ++++++++++- weed/plugin/worker/iceberg/filer_io.go | 29 +-- weed/s3api/s3tables/handler.go | 9 +- weed/server/filer_grpc_server.go | 31 ++++ 7 files changed, 254 insertions(+), 100 deletions(-) diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 78dd58b1f..e17e51fe7 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -213,6 +213,7 @@ message UpdateEntryRequest { Entry entry = 2; bool is_from_other_cluster = 3; repeated int32 signatures = 4; + map expected_extended = 5; } message UpdateEntryResponse { SubscribeMetadataResponse metadata_event = 1; diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 78dd58b1f..e17e51fe7 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -213,6 +213,7 @@ message UpdateEntryRequest { Entry entry = 2; bool is_from_other_cluster = 3; repeated int32 signatures = 4; + map expected_extended = 5; } message UpdateEntryResponse { SubscribeMetadataResponse metadata_event = 1; diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index e86fc9d2a..c90884b78 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1175,6 +1175,7 @@ type UpdateEntryRequest struct { Entry *Entry `protobuf:"bytes,2,opt,name=entry,proto3" json:"entry,omitempty"` IsFromOtherCluster bool `protobuf:"varint,3,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` Signatures []int32 `protobuf:"varint,4,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` + ExpectedExtended map[string][]byte `protobuf:"bytes,5,rep,name=expected_extended,json=expectedExtended,proto3" json:"expected_extended,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1237,6 +1238,13 @@ func (x *UpdateEntryRequest) GetSignatures() []int32 { return nil } +func (x *UpdateEntryRequest) GetExpectedExtended() map[string][]byte { + if x != nil { + return x.ExpectedExtended + } + return nil +} + type UpdateEntryResponse struct { state protoimpl.MessageState `protogen:"open.v1"` MetadataEvent *SubscribeMetadataResponse `protobuf:"bytes,1,opt,name=metadata_event,json=metadataEvent,proto3" json:"metadata_event,omitempty"` @@ -4234,7 +4242,7 @@ type LocateBrokerResponse_Resource struct { func (x *LocateBrokerResponse_Resource) Reset() { *x = LocateBrokerResponse_Resource{} - mi := &file_filer_proto_msgTypes[68] + mi := &file_filer_proto_msgTypes[69] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4246,7 +4254,7 @@ func (x *LocateBrokerResponse_Resource) String() string { func (*LocateBrokerResponse_Resource) ProtoMessage() {} func (x *LocateBrokerResponse_Resource) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[68] + mi := &file_filer_proto_msgTypes[69] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4300,7 +4308,7 @@ type FilerConf_PathConf struct { func (x *FilerConf_PathConf) Reset() { *x = FilerConf_PathConf{} - mi := &file_filer_proto_msgTypes[69] + mi := &file_filer_proto_msgTypes[70] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4312,7 +4320,7 @@ func (x *FilerConf_PathConf) String() string { func (*FilerConf_PathConf) ProtoMessage() {} func (x *FilerConf_PathConf) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[69] + mi := &file_filer_proto_msgTypes[70] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4550,14 +4558,18 @@ const file_filer_proto_rawDesc = "" + "\x1bskip_check_parent_directory\x18\x06 \x01(\bR\x18skipCheckParentDirectory\"w\n" + "\x13CreateEntryResponse\x12\x14\n" + "\x05error\x18\x01 \x01(\tR\x05error\x12J\n" + - "\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\xac\x01\n" + + "\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\xd2\x02\n" + "\x12UpdateEntryRequest\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12%\n" + "\x05entry\x18\x02 \x01(\v2\x0f.filer_pb.EntryR\x05entry\x121\n" + "\x15is_from_other_cluster\x18\x03 \x01(\bR\x12isFromOtherCluster\x12\x1e\n" + "\n" + "signatures\x18\x04 \x03(\x05R\n" + - "signatures\"a\n" + + "signatures\x12_\n" + + "\x11expected_extended\x18\x05 \x03(\v22.filer_pb.UpdateEntryRequest.ExpectedExtendedEntryR\x10expectedExtended\x1aC\n" + + "\x15ExpectedExtendedEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\fR\x05value:\x028\x01\"a\n" + "\x13UpdateEntryResponse\x12J\n" + "\x0emetadata_event\x18\x01 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\x80\x01\n" + "\x14AppendToEntryRequest\x12\x1c\n" + @@ -4865,7 +4877,7 @@ func file_filer_proto_rawDescGZIP() []byte { } var file_filer_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 70) +var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 71) var file_filer_proto_goTypes = []any{ (SSEType)(0), // 0: filer_pb.SSEType (*LookupDirectoryEntryRequest)(nil), // 1: filer_pb.LookupDirectoryEntryRequest @@ -4935,9 +4947,10 @@ var file_filer_proto_goTypes = []any{ (*TransferLocksRequest)(nil), // 65: filer_pb.TransferLocksRequest (*TransferLocksResponse)(nil), // 66: filer_pb.TransferLocksResponse nil, // 67: filer_pb.Entry.ExtendedEntry - nil, // 68: filer_pb.LookupVolumeResponse.LocationsMapEntry - (*LocateBrokerResponse_Resource)(nil), // 69: filer_pb.LocateBrokerResponse.Resource - (*FilerConf_PathConf)(nil), // 70: filer_pb.FilerConf.PathConf + nil, // 68: filer_pb.UpdateEntryRequest.ExpectedExtendedEntry + nil, // 69: filer_pb.LookupVolumeResponse.LocationsMapEntry + (*LocateBrokerResponse_Resource)(nil), // 70: filer_pb.LocateBrokerResponse.Resource + (*FilerConf_PathConf)(nil), // 71: filer_pb.FilerConf.PathConf } var file_filer_proto_depIdxs = []int32{ 6, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry @@ -4956,77 +4969,78 @@ var file_filer_proto_depIdxs = []int32{ 6, // 13: filer_pb.CreateEntryRequest.entry:type_name -> filer_pb.Entry 43, // 14: filer_pb.CreateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 6, // 15: filer_pb.UpdateEntryRequest.entry:type_name -> filer_pb.Entry - 43, // 16: filer_pb.UpdateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse - 9, // 17: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk - 43, // 18: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse - 8, // 19: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification - 29, // 20: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location - 29, // 21: filer_pb.Locations.locations:type_name -> filer_pb.Location - 68, // 22: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry - 31, // 23: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection - 8, // 24: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification - 6, // 25: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry - 69, // 26: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource - 70, // 27: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf - 6, // 28: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry - 43, // 29: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse - 64, // 30: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock - 28, // 31: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations - 1, // 32: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest - 3, // 33: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest - 13, // 34: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest - 15, // 35: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest - 17, // 36: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest - 19, // 37: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest - 21, // 38: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest - 23, // 39: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest - 25, // 40: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest - 27, // 41: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest - 32, // 42: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest - 34, // 43: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest - 36, // 44: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest - 38, // 45: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest - 40, // 46: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest - 44, // 47: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest - 42, // 48: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 42, // 49: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 51, // 50: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest - 53, // 51: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest - 56, // 52: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest - 58, // 53: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest - 60, // 54: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest - 62, // 55: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest - 65, // 56: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest - 2, // 57: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse - 4, // 58: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse - 14, // 59: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse - 16, // 60: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse - 18, // 61: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse - 20, // 62: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse - 22, // 63: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse - 24, // 64: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse - 26, // 65: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse - 30, // 66: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse - 33, // 67: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse - 35, // 68: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse - 37, // 69: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse - 39, // 70: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse - 41, // 71: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse - 45, // 72: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse - 43, // 73: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 43, // 74: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 52, // 75: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse - 54, // 76: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse - 57, // 77: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse - 59, // 78: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse - 61, // 79: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse - 63, // 80: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse - 66, // 81: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse - 57, // [57:82] is the sub-list for method output_type - 32, // [32:57] is the sub-list for method input_type - 32, // [32:32] is the sub-list for extension type_name - 32, // [32:32] is the sub-list for extension extendee - 0, // [0:32] is the sub-list for field type_name + 68, // 16: filer_pb.UpdateEntryRequest.expected_extended:type_name -> filer_pb.UpdateEntryRequest.ExpectedExtendedEntry + 43, // 17: filer_pb.UpdateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse + 9, // 18: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk + 43, // 19: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse + 8, // 20: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification + 29, // 21: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location + 29, // 22: filer_pb.Locations.locations:type_name -> filer_pb.Location + 69, // 23: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry + 31, // 24: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection + 8, // 25: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification + 6, // 26: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry + 70, // 27: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource + 71, // 28: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf + 6, // 29: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry + 43, // 30: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse + 64, // 31: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock + 28, // 32: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations + 1, // 33: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest + 3, // 34: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest + 13, // 35: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest + 15, // 36: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest + 17, // 37: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest + 19, // 38: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest + 21, // 39: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest + 23, // 40: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest + 25, // 41: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest + 27, // 42: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest + 32, // 43: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest + 34, // 44: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest + 36, // 45: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest + 38, // 46: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest + 40, // 47: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest + 44, // 48: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest + 42, // 49: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 42, // 50: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 51, // 51: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest + 53, // 52: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest + 56, // 53: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest + 58, // 54: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest + 60, // 55: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest + 62, // 56: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest + 65, // 57: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest + 2, // 58: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse + 4, // 59: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse + 14, // 60: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse + 16, // 61: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse + 18, // 62: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse + 20, // 63: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse + 22, // 64: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse + 24, // 65: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse + 26, // 66: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse + 30, // 67: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse + 33, // 68: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse + 35, // 69: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse + 37, // 70: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse + 39, // 71: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse + 41, // 72: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse + 45, // 73: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse + 43, // 74: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 43, // 75: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 52, // 76: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse + 54, // 77: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse + 57, // 78: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse + 59, // 79: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse + 61, // 80: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse + 63, // 81: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse + 66, // 82: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse + 58, // [58:83] is the sub-list for method output_type + 33, // [33:58] is the sub-list for method input_type + 33, // [33:33] is the sub-list for extension type_name + 33, // [33:33] is the sub-list for extension extendee + 0, // [0:33] is the sub-list for field type_name } func init() { file_filer_proto_init() } @@ -5040,7 +5054,7 @@ func file_filer_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_filer_proto_rawDesc), len(file_filer_proto_rawDesc)), NumEnums: 1, - NumMessages: 70, + NumMessages: 71, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index 25c3cc276..cbb4c088a 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) // --------------------------------------------------------------------------- @@ -35,8 +36,9 @@ import ( type fakeFilerServer struct { filer_pb.UnimplementedSeaweedFilerServer - mu sync.Mutex - entries map[string]map[string]*filer_pb.Entry // dir → name → entry + mu sync.Mutex + entries map[string]map[string]*filer_pb.Entry // dir → name → entry + beforeUpdate func(*fakeFilerServer, *filer_pb.UpdateEntryRequest) error // Counters for assertions createCalls int @@ -138,9 +140,41 @@ func (f *fakeFilerServer) CreateEntry(_ context.Context, req *filer_pb.CreateEnt func (f *fakeFilerServer) UpdateEntry(_ context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { f.mu.Lock() f.updateCalls++ + beforeUpdate := f.beforeUpdate + f.beforeUpdate = nil f.mu.Unlock() - f.putEntry(req.Directory, req.Entry.Name, req.Entry) + if beforeUpdate != nil { + if err := beforeUpdate(f, req); err != nil { + return nil, err + } + } + + f.mu.Lock() + defer f.mu.Unlock() + + dirEntries, ok := f.entries[req.Directory] + if !ok { + return nil, status.Errorf(codes.NotFound, "entry not found: %s/%s", req.Directory, req.Entry.Name) + } + current := dirEntries[req.Entry.Name] + if current == nil { + return nil, status.Errorf(codes.NotFound, "entry not found: %s/%s", req.Directory, req.Entry.Name) + } + for key, expectedValue := range req.ExpectedExtended { + actualValue, ok := current.Extended[key] + if ok { + if !bytes.Equal(actualValue, expectedValue) { + return nil, status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key) + } + continue + } + if len(expectedValue) > 0 { + return nil, status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key) + } + } + + dirEntries[req.Entry.Name] = req.Entry return &filer_pb.UpdateEntryResponse{}, nil } @@ -281,7 +315,8 @@ func populateTable(t *testing.T, fs *fakeFilerServer, setup tableSetup) table.Me Name: setup.TableName, IsDirectory: true, Extended: map[string][]byte{ - s3tables.ExtendedKeyMetadata: xattr, + s3tables.ExtendedKeyMetadata: xattr, + s3tables.ExtendedKeyMetadataVersion: metadataVersionXattr(metadataVersion), }, }) @@ -1486,6 +1521,68 @@ func TestMetadataVersionCAS(t *testing.T) { } } +func TestMetadataVersionCASDetectsConcurrentUpdate(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "ns", + TableName: "tbl", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + }, + } + populateTable(t, fs, setup) + + tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath()) + fs.beforeUpdate = func(f *fakeFilerServer, req *filer_pb.UpdateEntryRequest) error { + entry := f.getEntry(path.Dir(tableDir), path.Base(tableDir)) + if entry == nil { + return fmt.Errorf("table entry not found before concurrent update") + } + + updatedEntry := cloneEntryForTest(t, entry) + var internalMeta map[string]json.RawMessage + if err := json.Unmarshal(updatedEntry.Extended[s3tables.ExtendedKeyMetadata], &internalMeta); err != nil { + return fmt.Errorf("unmarshal xattr: %w", err) + } + + versionJSON, err := json.Marshal(2) + if err != nil { + return fmt.Errorf("marshal version: %w", err) + } + internalMeta["metadataVersion"] = versionJSON + + updatedXattr, err := json.Marshal(internalMeta) + if err != nil { + return fmt.Errorf("marshal xattr: %w", err) + } + updatedEntry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr + updatedEntry.Extended[s3tables.ExtendedKeyMetadataVersion] = metadataVersionXattr(2) + f.putEntry(path.Dir(tableDir), path.Base(tableDir), updatedEntry) + return nil + } + + err := updateTableMetadataXattr(context.Background(), client, tableDir, 1, []byte(`{}`), "metadata/v2.metadata.json") + if err == nil { + t.Fatal("expected version conflict error") + } + if !strings.Contains(err.Error(), "metadata version conflict") { + t.Fatalf("expected metadata version conflict, got %q", err) + } +} + +func cloneEntryForTest(t *testing.T, entry *filer_pb.Entry) *filer_pb.Entry { + t.Helper() + + cloned, ok := proto.Clone(entry).(*filer_pb.Entry) + if !ok { + t.Fatal("clone entry: unexpected type") + } + return cloned +} + // --------------------------------------------------------------------------- // Avro manifest content patching for tests // --------------------------------------------------------------------------- diff --git a/weed/plugin/worker/iceberg/filer_io.go b/weed/plugin/worker/iceberg/filer_io.go index db72b42a6..406ae4560 100644 --- a/weed/plugin/worker/iceberg/filer_io.go +++ b/weed/plugin/worker/iceberg/filer_io.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "path" + "strconv" "strings" "sync" "time" @@ -280,9 +281,10 @@ func deleteFilerFile(ctx context.Context, client filer_pb.SeaweedFilerClient, di } // updateTableMetadataXattr updates the table entry's metadata xattr with -// the new Iceberg metadata. It performs a compare-and-swap: if the stored -// metadataVersion does not match expectedVersion, it returns -// errMetadataVersionConflict so the caller can retry. +// the new Iceberg metadata. It verifies the stored metadataVersion before +// writing and passes the previous metadata xattr back to the filer as a +// server-side precondition so concurrent writers fail with a retryable +// metadata version conflict. // newMetadataLocation is the table-relative path to the new metadata file // (e.g. "metadata/v3.metadata.json"). func updateTableMetadataXattr(ctx context.Context, client filer_pb.SeaweedFilerClient, tableDir string, expectedVersion int, newFullMetadata []byte, newMetadataLocation string) error { @@ -311,13 +313,8 @@ func updateTableMetadataXattr(ctx context.Context, client filer_pb.SeaweedFilerC return fmt.Errorf("unmarshal existing xattr: %w", err) } - // Compare-and-swap: verify the stored metadataVersion matches what we expect. - // NOTE: This is a client-side CAS — two workers could both read the same - // version, pass this check, and race at UpdateEntry (last-write-wins). - // The proper fix is server-side precondition support on UpdateEntryRequest - // (e.g. expect-version or If-Match semantics). Until then, commitWithRetry - // with exponential backoff mitigates but does not eliminate the race. - // Avoid scheduling concurrent maintenance on the same table. + // Verify the stored metadataVersion matches what we expect before issuing + // the conditional UpdateEntry request below. versionRaw, ok := internalMeta["metadataVersion"] if !ok { return fmt.Errorf("%w: metadataVersion field missing from xattr", errMetadataVersionConflict) @@ -368,17 +365,29 @@ func updateTableMetadataXattr(ctx context.Context, client filer_pb.SeaweedFilerC return fmt.Errorf("marshal updated xattr: %w", err) } + expectedVersionXattr := resp.Entry.Extended[s3tables.ExtendedKeyMetadataVersion] resp.Entry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr + resp.Entry.Extended[s3tables.ExtendedKeyMetadataVersion] = metadataVersionXattr(newVersion) _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ Directory: parentDir, Entry: resp.Entry, + ExpectedExtended: map[string][]byte{ + s3tables.ExtendedKeyMetadataVersion: expectedVersionXattr, + }, }) if err != nil { + if status.Code(err) == codes.FailedPrecondition { + return fmt.Errorf("%w: table metadata changed during update", errMetadataVersionConflict) + } return fmt.Errorf("update table entry: %w", err) } return nil } +func metadataVersionXattr(version int) []byte { + return []byte(strconv.Itoa(version)) +} + // generateIcebergVersionToken produces a random hex token, mirroring the // logic in s3tables.generateVersionToken (which is unexported). func generateIcebergVersionToken() string { diff --git a/weed/s3api/s3tables/handler.go b/weed/s3api/s3tables/handler.go index 1ead82309..2572a49c3 100644 --- a/weed/s3api/s3tables/handler.go +++ b/weed/s3api/s3tables/handler.go @@ -21,10 +21,11 @@ const ( DefaultRegion = "us-east-1" // Extended entry attributes for metadata storage - ExtendedKeyTableBucket = "s3tables.tableBucket" - ExtendedKeyMetadata = "s3tables.metadata" - ExtendedKeyPolicy = "s3tables.policy" - ExtendedKeyTags = "s3tables.tags" + ExtendedKeyTableBucket = "s3tables.tableBucket" + ExtendedKeyMetadata = "s3tables.metadata" + ExtendedKeyMetadataVersion = "s3tables.metadataVersion" + ExtendedKeyPolicy = "s3tables.policy" + ExtendedKeyTags = "s3tables.tags" // Maximum request body size (10MB) maxRequestBodySize = 10 * 1024 * 1024 diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 219703ad5..1b7b050c0 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "context" "fmt" "os" @@ -17,6 +18,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/wdclient" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { @@ -202,6 +205,9 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr if err != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } + if err := validateUpdateEntryPreconditions(entry, req.ExpectedExtended); err != nil { + return &filer_pb.UpdateEntryResponse{}, err + } chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry) if err2 != nil { @@ -235,6 +241,31 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return resp, err } +func validateUpdateEntryPreconditions(entry *filer.Entry, expectedExtended map[string][]byte) error { + if len(expectedExtended) == 0 { + return nil + } + + for key, expectedValue := range expectedExtended { + var actualValue []byte + var ok bool + if entry != nil { + actualValue, ok = entry.Extended[key] + } + if ok { + if !bytes.Equal(actualValue, expectedValue) { + return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key) + } + continue + } + if len(expectedValue) > 0 { + return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key) + } + } + + return nil +} + func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { // remove old chunks if not included in the new ones