Iceberg commit reliability: preserve statistics updates and return 409 conflicts (#8277)
* iceberg: harden table commit updates and conflict handling * iceberg: refine commit retry and statistics patching * iceberg: cleanup metadata on non-conflict commit errors
This commit is contained in:
@@ -6,7 +6,9 @@ package iceberg
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
@@ -272,6 +274,196 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, tablePath, me
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) deleteMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string) error {
|
||||
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
metadataDir := path.Join(s3tables.TablesPath, bucketName)
|
||||
if tablePath != "" {
|
||||
metadataDir = path.Join(metadataDir, tablePath)
|
||||
}
|
||||
metadataDir = path.Join(metadataDir, "metadata")
|
||||
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return filer_pb.DoRemove(opCtx, client, metadataDir, metadataFileName, true, false, true, false, nil)
|
||||
})
|
||||
}
|
||||
|
||||
type statisticsUpdate struct {
|
||||
set *table.StatisticsFile
|
||||
remove *int64
|
||||
}
|
||||
|
||||
var ErrIncompleteSetStatistics = errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes")
|
||||
|
||||
type commitAction struct {
|
||||
Action string `json:"action"`
|
||||
}
|
||||
|
||||
type setStatisticsUpdate struct {
|
||||
Action string `json:"action"`
|
||||
SnapshotID *int64 `json:"snapshot-id,omitempty"`
|
||||
StatisticsPath string `json:"statistics-path,omitempty"`
|
||||
FileSizeInBytes *int64 `json:"file-size-in-bytes,omitempty"`
|
||||
FileFooterSizeInBytes *int64 `json:"file-footer-size-in-bytes,omitempty"`
|
||||
KeyMetadata *string `json:"key-metadata,omitempty"`
|
||||
BlobMetadata []table.BlobMetadata `json:"blob-metadata,omitempty"`
|
||||
Statistics *table.StatisticsFile `json:"statistics,omitempty"`
|
||||
}
|
||||
|
||||
func (u *setStatisticsUpdate) asStatisticsFile() (*table.StatisticsFile, error) {
|
||||
if u.Statistics != nil {
|
||||
if u.Statistics.BlobMetadata == nil {
|
||||
u.Statistics.BlobMetadata = []table.BlobMetadata{}
|
||||
}
|
||||
return u.Statistics, nil
|
||||
}
|
||||
if u.SnapshotID == nil || u.StatisticsPath == "" || u.FileSizeInBytes == nil || u.FileFooterSizeInBytes == nil {
|
||||
return nil, ErrIncompleteSetStatistics
|
||||
}
|
||||
|
||||
stats := &table.StatisticsFile{
|
||||
SnapshotID: *u.SnapshotID,
|
||||
StatisticsPath: u.StatisticsPath,
|
||||
FileSizeInBytes: *u.FileSizeInBytes,
|
||||
FileFooterSizeInBytes: *u.FileFooterSizeInBytes,
|
||||
KeyMetadata: u.KeyMetadata,
|
||||
BlobMetadata: u.BlobMetadata,
|
||||
}
|
||||
if stats.BlobMetadata == nil {
|
||||
stats.BlobMetadata = []table.BlobMetadata{}
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
type removeStatisticsUpdate struct {
|
||||
Action string `json:"action"`
|
||||
SnapshotID int64 `json:"snapshot-id"`
|
||||
}
|
||||
|
||||
func parseCommitUpdates(rawUpdates []json.RawMessage) (table.Updates, []statisticsUpdate, error) {
|
||||
filtered := make([]json.RawMessage, 0, len(rawUpdates))
|
||||
statisticsUpdates := make([]statisticsUpdate, 0)
|
||||
|
||||
for _, raw := range rawUpdates {
|
||||
var action commitAction
|
||||
if err := json.Unmarshal(raw, &action); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
switch action.Action {
|
||||
case "set-statistics":
|
||||
var setUpdate setStatisticsUpdate
|
||||
if err := json.Unmarshal(raw, &setUpdate); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
stats, err := setUpdate.asStatisticsFile()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
statisticsUpdates = append(statisticsUpdates, statisticsUpdate{set: stats})
|
||||
case "remove-statistics":
|
||||
var removeUpdate removeStatisticsUpdate
|
||||
if err := json.Unmarshal(raw, &removeUpdate); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
snapshotID := removeUpdate.SnapshotID
|
||||
statisticsUpdates = append(statisticsUpdates, statisticsUpdate{remove: &snapshotID})
|
||||
default:
|
||||
filtered = append(filtered, raw)
|
||||
}
|
||||
}
|
||||
|
||||
if len(filtered) == 0 {
|
||||
return nil, statisticsUpdates, nil
|
||||
}
|
||||
|
||||
data, err := json.Marshal(filtered)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var updates table.Updates
|
||||
if err := json.Unmarshal(data, &updates); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return updates, statisticsUpdates, nil
|
||||
}
|
||||
|
||||
func applyStatisticsUpdates(metadataBytes []byte, updates []statisticsUpdate) ([]byte, error) {
|
||||
if len(updates) == 0 {
|
||||
return metadataBytes, nil
|
||||
}
|
||||
|
||||
var metadata map[string]json.RawMessage
|
||||
if err := json.Unmarshal(metadataBytes, &metadata); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var statistics []table.StatisticsFile
|
||||
if rawStatistics, ok := metadata["statistics"]; ok && len(rawStatistics) > 0 {
|
||||
if err := json.Unmarshal(rawStatistics, &statistics); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
statisticsBySnapshot := make(map[int64]table.StatisticsFile, len(statistics))
|
||||
orderedSnapshotIDs := make([]int64, 0, len(statistics))
|
||||
inOrder := make(map[int64]bool, len(statistics))
|
||||
for _, stat := range statistics {
|
||||
statisticsBySnapshot[stat.SnapshotID] = stat
|
||||
if !inOrder[stat.SnapshotID] {
|
||||
orderedSnapshotIDs = append(orderedSnapshotIDs, stat.SnapshotID)
|
||||
inOrder[stat.SnapshotID] = true
|
||||
}
|
||||
}
|
||||
|
||||
for _, update := range updates {
|
||||
if update.set != nil {
|
||||
statisticsBySnapshot[update.set.SnapshotID] = *update.set
|
||||
if !inOrder[update.set.SnapshotID] {
|
||||
orderedSnapshotIDs = append(orderedSnapshotIDs, update.set.SnapshotID)
|
||||
inOrder[update.set.SnapshotID] = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
if update.remove != nil {
|
||||
delete(statisticsBySnapshot, *update.remove)
|
||||
}
|
||||
}
|
||||
|
||||
statistics = make([]table.StatisticsFile, 0, len(statisticsBySnapshot))
|
||||
for _, snapshotID := range orderedSnapshotIDs {
|
||||
stat, ok := statisticsBySnapshot[snapshotID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
statistics = append(statistics, stat)
|
||||
}
|
||||
|
||||
if len(statistics) == 0 {
|
||||
delete(metadata, "statistics")
|
||||
} else {
|
||||
data, err := json.Marshal(statistics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metadata["statistics"] = data
|
||||
}
|
||||
|
||||
return json.Marshal(metadata)
|
||||
}
|
||||
|
||||
func isS3TablesConflict(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if errors.Is(err, s3tables.ErrVersionTokenMismatch) {
|
||||
return true
|
||||
}
|
||||
var tableErr *s3tables.S3TablesError
|
||||
return errors.As(err, &tableErr) && tableErr.Type == s3tables.ErrCodeConflict
|
||||
}
|
||||
|
||||
// parseNamespace parses the namespace from path parameter.
|
||||
// Iceberg uses unit separator (0x1F) for multi-level namespaces.
|
||||
// Note: mux already decodes URL-encoded path parameters, so we only split by unit separator.
|
||||
@@ -1043,7 +1235,8 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
|
||||
// Extract identity from context
|
||||
identityName := s3_constants.GetIdentityNameFromContext(r)
|
||||
|
||||
// Parse the commit request, skipping update actions not supported by iceberg-go.
|
||||
// Parse commit request and keep statistics updates separate because iceberg-go v0.4.0
|
||||
// does not decode set/remove-statistics update actions yet.
|
||||
var raw struct {
|
||||
Identifier *TableIdentifier `json:"identifier,omitempty"`
|
||||
Requirements json.RawMessage `json:"requirements"`
|
||||
@@ -1056,6 +1249,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var req CommitTableRequest
|
||||
req.Identifier = raw.Identifier
|
||||
var statisticsUpdates []statisticsUpdate
|
||||
if len(raw.Requirements) > 0 {
|
||||
if err := json.Unmarshal(raw.Requirements, &req.Requirements); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid requirements: "+err.Error())
|
||||
@@ -1063,178 +1257,174 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
if len(raw.Updates) > 0 {
|
||||
filtered := make([]json.RawMessage, 0, len(raw.Updates))
|
||||
for _, update := range raw.Updates {
|
||||
var action struct {
|
||||
Action string `json:"action"`
|
||||
}
|
||||
if err := json.Unmarshal(update, &action); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid update: "+err.Error())
|
||||
var err error
|
||||
req.Updates, statisticsUpdates, err = parseCommitUpdates(raw.Updates)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
maxCommitAttempts := 3
|
||||
generatedLegacyUUID := uuid.New()
|
||||
for attempt := 1; attempt <= maxCommitAttempts; attempt++ {
|
||||
getReq := &s3tables.GetTableRequest{
|
||||
TableBucketARN: bucketARN,
|
||||
Namespace: namespace,
|
||||
Name: tableName,
|
||||
}
|
||||
var getResp s3tables.GetTableResponse
|
||||
|
||||
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
mgrClient := s3tables.NewManagerClient(client)
|
||||
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
|
||||
})
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
|
||||
return
|
||||
}
|
||||
if action.Action == "set-statistics" {
|
||||
glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
location := tableLocationFromMetadataLocation(getResp.MetadataLocation)
|
||||
if location == "" {
|
||||
location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName)
|
||||
}
|
||||
tableUUID := uuid.Nil
|
||||
if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" {
|
||||
if parsed, parseErr := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); parseErr == nil {
|
||||
tableUUID = parsed
|
||||
}
|
||||
}
|
||||
if tableUUID == uuid.Nil {
|
||||
tableUUID = generatedLegacyUUID
|
||||
}
|
||||
|
||||
var currentMetadata table.Metadata
|
||||
if getResp.Metadata != nil && len(getResp.Metadata.FullMetadata) > 0 {
|
||||
currentMetadata, err = table.ParseMetadataBytes(getResp.Metadata.FullMetadata)
|
||||
if err != nil {
|
||||
glog.Errorf("Iceberg: Failed to parse current metadata for %s: %v", tableName, err)
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse current metadata")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
currentMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil)
|
||||
}
|
||||
if currentMetadata == nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
|
||||
return
|
||||
}
|
||||
|
||||
for _, requirement := range req.Requirements {
|
||||
if err := requirement.Validate(currentMetadata); err != nil {
|
||||
writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
builder, err := table.MetadataBuilderFromBase(currentMetadata, getResp.MetadataLocation)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+err.Error())
|
||||
return
|
||||
}
|
||||
for _, update := range req.Updates {
|
||||
if err := update.Apply(builder); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
newMetadata, err := builder.Build()
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
metadataVersion := getResp.MetadataVersion + 1
|
||||
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
|
||||
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
|
||||
|
||||
metadataBytes, err := json.Marshal(newMetadata)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error())
|
||||
return
|
||||
}
|
||||
// iceberg-go does not currently support set/remove-statistics updates in MetadataBuilder.
|
||||
// Patch the encoded metadata JSON and parse it back to keep the response object consistent.
|
||||
metadataBytes, err = applyStatisticsUpdates(metadataBytes, statisticsUpdates)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+err.Error())
|
||||
return
|
||||
}
|
||||
newMetadata, err = table.ParseMetadataBytes(metadataBytes)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
metadataBucket, metadataPath, err := parseS3Location(location)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error())
|
||||
return
|
||||
}
|
||||
if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
updateReq := &s3tables.UpdateTableRequest{
|
||||
TableBucketARN: bucketARN,
|
||||
Namespace: namespace,
|
||||
Name: tableName,
|
||||
VersionToken: getResp.VersionToken,
|
||||
Metadata: &s3tables.TableMetadata{
|
||||
Iceberg: &s3tables.IcebergMetadata{
|
||||
TableUUID: tableUUID.String(),
|
||||
},
|
||||
FullMetadata: metadataBytes,
|
||||
},
|
||||
MetadataVersion: metadataVersion,
|
||||
MetadataLocation: newMetadataLocation,
|
||||
}
|
||||
|
||||
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
mgrClient := s3tables.NewManagerClient(client)
|
||||
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName)
|
||||
})
|
||||
if err == nil {
|
||||
result := CommitTableResponse{
|
||||
MetadataLocation: newMetadataLocation,
|
||||
Metadata: newMetadata,
|
||||
}
|
||||
writeJSON(w, http.StatusOK, result)
|
||||
return
|
||||
}
|
||||
|
||||
if isS3TablesConflict(err) {
|
||||
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
|
||||
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s on conflict: %v", newMetadataLocation, cleanupErr)
|
||||
}
|
||||
if attempt < maxCommitAttempts {
|
||||
glog.V(1).Infof("Iceberg: CommitTable conflict for %s (attempt %d/%d), retrying", tableName, attempt, maxCommitAttempts)
|
||||
jitter := time.Duration(rand.Int64N(int64(25 * time.Millisecond)))
|
||||
time.Sleep(time.Duration(50*attempt)*time.Millisecond + jitter)
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, update)
|
||||
}
|
||||
if len(filtered) > 0 {
|
||||
updatesBytes, err := json.Marshal(filtered)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse updates: "+err.Error())
|
||||
return
|
||||
}
|
||||
if err := json.Unmarshal(updatesBytes, &req.Updates); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid updates: "+err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// First, load current table metadata
|
||||
getReq := &s3tables.GetTableRequest{
|
||||
TableBucketARN: bucketARN,
|
||||
Namespace: namespace,
|
||||
Name: tableName,
|
||||
}
|
||||
var getResp s3tables.GetTableResponse
|
||||
|
||||
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
mgrClient := s3tables.NewManagerClient(client)
|
||||
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
|
||||
writeError(w, http.StatusConflict, "CommitFailedException", "Version token mismatch")
|
||||
return
|
||||
}
|
||||
glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Build the current metadata
|
||||
location := tableLocationFromMetadataLocation(getResp.MetadataLocation)
|
||||
if location == "" {
|
||||
location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName)
|
||||
}
|
||||
tableUUID := uuid.Nil
|
||||
if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" {
|
||||
if parsed, err := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); err == nil {
|
||||
tableUUID = parsed
|
||||
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
|
||||
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after update failure: %v", newMetadataLocation, cleanupErr)
|
||||
}
|
||||
}
|
||||
if tableUUID == uuid.Nil {
|
||||
tableUUID = uuid.New()
|
||||
}
|
||||
|
||||
var currentMetadata table.Metadata
|
||||
if getResp.Metadata != nil && len(getResp.Metadata.FullMetadata) > 0 {
|
||||
var err error
|
||||
currentMetadata, err = table.ParseMetadataBytes(getResp.Metadata.FullMetadata)
|
||||
if err != nil {
|
||||
glog.Errorf("Iceberg: Failed to parse current metadata for %s: %v", tableName, err)
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse current metadata")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Fallback for tables without persisted full metadata (legacy or error state)
|
||||
currentMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil)
|
||||
}
|
||||
|
||||
if currentMetadata == nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
|
||||
return
|
||||
}
|
||||
|
||||
// Validate all requirements against current metadata
|
||||
for _, requirement := range req.Requirements {
|
||||
if err := requirement.Validate(currentMetadata); err != nil {
|
||||
writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updates using MetadataBuilder
|
||||
builder, err := table.MetadataBuilderFromBase(currentMetadata, getResp.MetadataLocation)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for _, update := range req.Updates {
|
||||
if err := update.Apply(builder); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Build the new metadata
|
||||
newMetadata, err := builder.Build()
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Determine next metadata version
|
||||
metadataVersion := getResp.MetadataVersion + 1
|
||||
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
|
||||
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
|
||||
|
||||
// Serialize metadata to JSON
|
||||
metadataBytes, err := json.Marshal(newMetadata)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 1. Save metadata file to filer
|
||||
metadataBucket, metadataPath, err := parseS3Location(location)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error())
|
||||
return
|
||||
}
|
||||
if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Persist the new metadata and update the table reference
|
||||
updateReq := &s3tables.UpdateTableRequest{
|
||||
TableBucketARN: bucketARN,
|
||||
Namespace: namespace,
|
||||
Name: tableName,
|
||||
VersionToken: getResp.VersionToken,
|
||||
Metadata: &s3tables.TableMetadata{
|
||||
Iceberg: &s3tables.IcebergMetadata{
|
||||
TableUUID: tableUUID.String(),
|
||||
},
|
||||
FullMetadata: metadataBytes,
|
||||
},
|
||||
MetadataVersion: metadataVersion,
|
||||
MetadataLocation: newMetadataLocation,
|
||||
}
|
||||
|
||||
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
mgrClient := s3tables.NewManagerClient(client)
|
||||
// 1. Write metadata file (this would normally be an S3 PutObject,
|
||||
// but s3tables manager handles the metadata storage logic)
|
||||
// For now, we assume s3tables.UpdateTable handles the reference update.
|
||||
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Iceberg: CommitTable UpdateTable error: %v", err)
|
||||
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table update: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Return the new metadata
|
||||
result := CommitTableResponse{
|
||||
MetadataLocation: newMetadataLocation,
|
||||
Metadata: newMetadata,
|
||||
}
|
||||
writeJSON(w, http.StatusOK, result)
|
||||
}
|
||||
|
||||
// newTableMetadata creates a new table.Metadata object with the given parameters.
|
||||
|
||||
103
weed/s3api/iceberg/iceberg_commit_updates_test.go
Normal file
103
weed/s3api/iceberg/iceberg_commit_updates_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package iceberg
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/apache/iceberg-go/table"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
||||
)
|
||||
|
||||
func TestParseCommitUpdatesSeparatesStatistics(t *testing.T) {
|
||||
raw := []json.RawMessage{
|
||||
json.RawMessage(`{"action":"set-statistics","snapshot-id":10,"statistics-path":"s3://bucket/table/metadata/stats.puffin","file-size-in-bytes":100,"file-footer-size-in-bytes":20,"blob-metadata":[]}`),
|
||||
json.RawMessage(`{"action":"set-properties","updates":{"k":"v"}}`),
|
||||
}
|
||||
|
||||
updates, stats, err := parseCommitUpdates(raw)
|
||||
if err != nil {
|
||||
t.Fatalf("parseCommitUpdates() error = %v", err)
|
||||
}
|
||||
if len(stats) != 1 {
|
||||
t.Fatalf("statistics updates = %d, want 1", len(stats))
|
||||
}
|
||||
if stats[0].set == nil || stats[0].set.SnapshotID != 10 {
|
||||
t.Fatalf("unexpected statistics update: %#v", stats[0])
|
||||
}
|
||||
if len(updates) != 1 {
|
||||
t.Fatalf("decoded updates = %d, want 1", len(updates))
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCommitUpdatesRejectsIncompleteSetStatistics(t *testing.T) {
|
||||
raw := []json.RawMessage{
|
||||
json.RawMessage(`{"action":"set-statistics","snapshot-id":10}`),
|
||||
}
|
||||
|
||||
_, _, err := parseCommitUpdates(raw)
|
||||
if err == nil {
|
||||
t.Fatalf("parseCommitUpdates() expected error")
|
||||
}
|
||||
if !errors.Is(err, ErrIncompleteSetStatistics) {
|
||||
t.Fatalf("parseCommitUpdates() error = %v, want ErrIncompleteSetStatistics", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyStatisticsUpdatesUpsertAndRemove(t *testing.T) {
|
||||
metadata := []byte(`{"format-version":2,"statistics":[{"snapshot-id":1,"statistics-path":"s3://bucket/stats-1.puffin","file-size-in-bytes":10,"file-footer-size-in-bytes":1,"blob-metadata":[]},{"snapshot-id":2,"statistics-path":"s3://bucket/stats-2.puffin","file-size-in-bytes":20,"file-footer-size-in-bytes":2,"blob-metadata":[]}]} `)
|
||||
|
||||
snapshotID := int64(2)
|
||||
setUpdates := []statisticsUpdate{
|
||||
{
|
||||
set: &statisticsFileForTest,
|
||||
},
|
||||
{
|
||||
remove: &snapshotID,
|
||||
},
|
||||
}
|
||||
|
||||
updated, err := applyStatisticsUpdates(metadata, setUpdates)
|
||||
if err != nil {
|
||||
t.Fatalf("applyStatisticsUpdates() error = %v", err)
|
||||
}
|
||||
|
||||
var decoded map[string]json.RawMessage
|
||||
if err := json.Unmarshal(updated, &decoded); err != nil {
|
||||
t.Fatalf("json.Unmarshal(updated) error = %v", err)
|
||||
}
|
||||
|
||||
var stats []map[string]any
|
||||
if err := json.Unmarshal(decoded["statistics"], &stats); err != nil {
|
||||
t.Fatalf("json.Unmarshal(statistics) error = %v", err)
|
||||
}
|
||||
if len(stats) != 1 {
|
||||
t.Fatalf("statistics length = %d, want 1", len(stats))
|
||||
}
|
||||
if got := int64(stats[0]["snapshot-id"].(float64)); got != 1 {
|
||||
t.Fatalf("remaining snapshot-id = %d, want 1", got)
|
||||
}
|
||||
if got := int64(stats[0]["file-size-in-bytes"].(float64)); got != 11 {
|
||||
t.Fatalf("remaining file-size-in-bytes = %d, want 11", got)
|
||||
}
|
||||
}
|
||||
|
||||
var statisticsFileForTest = table.StatisticsFile{
|
||||
SnapshotID: 1,
|
||||
StatisticsPath: "s3://bucket/stats-1.puffin",
|
||||
FileSizeInBytes: 11,
|
||||
FileFooterSizeInBytes: 2,
|
||||
BlobMetadata: []table.BlobMetadata{},
|
||||
}
|
||||
|
||||
func TestIsS3TablesConflict(t *testing.T) {
|
||||
if !isS3TablesConflict(s3tables.ErrVersionTokenMismatch) {
|
||||
t.Fatalf("expected ErrVersionTokenMismatch to be conflict")
|
||||
}
|
||||
if !isS3TablesConflict(&s3tables.S3TablesError{Type: s3tables.ErrCodeConflict, Message: "Version token mismatch"}) {
|
||||
t.Fatalf("expected S3Tables conflict error to be conflict")
|
||||
}
|
||||
if isS3TablesConflict(errors.New("other")) {
|
||||
t.Fatalf("unexpected conflict for non-conflict error")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user