Files
seaweedFS/weed/plugin/worker/iceberg/operations.go
Chris Lu e5c0889473 iceberg: add delete file rewrite maintenance (#8664)
* iceberg: add delete file rewrite maintenance

* iceberg: preserve untouched delete files during rewrites

* iceberg: share detection threshold defaults

* iceberg: add partition-scoped maintenance filters (#8665)

* iceberg: add partition-scoped maintenance filters

* iceberg: tighten where-filter partition matching
2026-03-16 21:11:09 -07:00

662 lines
22 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package iceberg
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"path"
"sort"
"strings"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
// errStalePlan is returned by a commit mutation when the table head has
// advanced since planning. The caller should not retry the same plan.
var errStalePlan = errors.New("stale plan: table head changed since planning")
// errMetadataVersionConflict is returned when the xattr update detects a
// concurrent metadata version change (compare-and-swap failure).
var errMetadataVersionConflict = errors.New("metadata version conflict")
// ---------------------------------------------------------------------------
// Operation: Expire Snapshots
// ---------------------------------------------------------------------------
// expireSnapshots removes old snapshots from the table metadata and cleans up
// their manifest list files.
func (h *Handler) expireSnapshots(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
config Config,
) (string, map[string]int64, error) {
start := time.Now()
// Load current metadata
meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
return "", nil, fmt.Errorf("load metadata: %w", err)
}
snapshots := meta.Snapshots()
if len(snapshots) == 0 {
return "no snapshots", nil, nil
}
// Determine which snapshots to expire
currentSnap := meta.CurrentSnapshot()
var currentSnapID int64
if currentSnap != nil {
currentSnapID = currentSnap.SnapshotID
}
retentionMs := config.SnapshotRetentionHours * 3600 * 1000
nowMs := time.Now().UnixMilli()
// Sort snapshots by timestamp descending (most recent first) so that
// the keep-count logic always preserves the newest snapshots.
sorted := make([]table.Snapshot, len(snapshots))
copy(sorted, snapshots)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].TimestampMs > sorted[j].TimestampMs
})
// Walk from newest to oldest. The current snapshot is always kept.
// Among the remaining, keep up to MaxSnapshotsToKeep-1 (since current
// counts toward the quota). Expire the rest only if they exceed the
// retention window; snapshots within the window are kept regardless.
var toExpire []int64
var kept int64
for _, snap := range sorted {
if snap.SnapshotID == currentSnapID {
kept++
continue
}
age := nowMs - snap.TimestampMs
if kept < config.MaxSnapshotsToKeep {
kept++
continue
}
if age > retentionMs {
toExpire = append(toExpire, snap.SnapshotID)
} else {
kept++
}
}
if len(toExpire) == 0 {
return "no snapshots expired", nil, nil
}
// Split snapshots into expired and kept sets
expireSet := make(map[int64]struct{}, len(toExpire))
for _, id := range toExpire {
expireSet[id] = struct{}{}
}
var expiredSnaps, keptSnaps []table.Snapshot
for _, snap := range sorted {
if _, ok := expireSet[snap.SnapshotID]; ok {
expiredSnaps = append(expiredSnaps, snap)
} else {
keptSnaps = append(keptSnaps, snap)
}
}
// Collect all files referenced by each set before modifying metadata.
// This lets us determine which files become unreferenced.
expiredFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, expiredSnaps)
if err != nil {
return "", nil, fmt.Errorf("collect expired snapshot files: %w", err)
}
keptFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, keptSnaps)
if err != nil {
return "", nil, fmt.Errorf("collect kept snapshot files: %w", err)
}
// Normalize kept file paths for consistent comparison
normalizedKept := make(map[string]struct{}, len(keptFiles))
for f := range keptFiles {
normalizedKept[normalizeIcebergPath(f, bucketName, tablePath)] = struct{}{}
}
// Use MetadataBuilder to remove snapshots and create new metadata
err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error {
// Guard: verify table head hasn't changed since we planned
cs := currentMeta.CurrentSnapshot()
if (cs == nil) != (currentSnapID == 0) || (cs != nil && cs.SnapshotID != currentSnapID) {
return errStalePlan
}
return builder.RemoveSnapshots(toExpire)
})
if err != nil {
return "", nil, fmt.Errorf("commit snapshot expiration: %w", err)
}
// Delete files exclusively referenced by expired snapshots (best-effort)
tableBasePath := path.Join(s3tables.TablesPath, bucketName, tablePath)
deletedCount := 0
for filePath := range expiredFiles {
normalized := normalizeIcebergPath(filePath, bucketName, tablePath)
if _, stillReferenced := normalizedKept[normalized]; stillReferenced {
continue
}
dir := path.Join(tableBasePath, path.Dir(normalized))
fileName := path.Base(normalized)
if delErr := deleteFilerFile(ctx, filerClient, dir, fileName); delErr != nil {
glog.Warningf("iceberg maintenance: failed to delete unreferenced file %s: %v", filePath, delErr)
} else {
deletedCount++
}
}
metrics := map[string]int64{
MetricSnapshotsExpired: int64(len(toExpire)),
MetricFilesDeleted: int64(deletedCount),
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf("expired %d snapshot(s), deleted %d unreferenced file(s)", len(toExpire), deletedCount), metrics, nil
}
// collectSnapshotFiles returns all file paths (manifest lists, manifest files,
// data files) referenced by the given snapshots. It returns an error if any
// manifest list or manifest cannot be read/parsed, to prevent delete decisions
// based on incomplete reference data.
func collectSnapshotFiles(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
snapshots []table.Snapshot,
) (map[string]struct{}, error) {
files := make(map[string]struct{})
for _, snap := range snapshots {
if snap.ManifestList == "" {
continue
}
files[snap.ManifestList] = struct{}{}
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, snap.ManifestList)
if err != nil {
return nil, fmt.Errorf("read manifest list %s: %w", snap.ManifestList, err)
}
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
if err != nil {
return nil, fmt.Errorf("parse manifest list %s: %w", snap.ManifestList, err)
}
for _, mf := range manifests {
files[mf.FilePath()] = struct{}{}
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return nil, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), false)
if err != nil {
return nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
for _, entry := range entries {
files[entry.DataFile().FilePath()] = struct{}{}
}
}
}
return files, nil
}
// ---------------------------------------------------------------------------
// Operation: Remove Orphans
// ---------------------------------------------------------------------------
// removeOrphans finds and deletes unreferenced files from the table's
// metadata/ and data/ directories.
func (h *Handler) removeOrphans(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
config Config,
) (string, map[string]int64, error) {
start := time.Now()
// Load current metadata
meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
return "", nil, fmt.Errorf("load metadata: %w", err)
}
orphanCandidates, err := collectOrphanCandidates(ctx, filerClient, bucketName, tablePath, meta, metadataFileName, config.OrphanOlderThanHours)
if err != nil {
return "", nil, fmt.Errorf("collect orphan candidates: %w", err)
}
orphanCount := 0
for _, candidate := range orphanCandidates {
if delErr := deleteFilerFile(ctx, filerClient, candidate.Dir, candidate.Entry.Name); delErr != nil {
glog.Warningf("iceberg maintenance: failed to delete orphan %s/%s: %v", candidate.Dir, candidate.Entry.Name, delErr)
} else {
orphanCount++
}
}
metrics := map[string]int64{
MetricOrphansRemoved: int64(orphanCount),
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf("removed %d orphan file(s)", orphanCount), metrics, nil
}
func collectOrphanCandidates(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
meta table.Metadata,
metadataFileName string,
orphanOlderThanHours int64,
) ([]filerFileEntry, error) {
referencedFiles, err := collectSnapshotFiles(ctx, filerClient, bucketName, tablePath, meta.Snapshots())
if err != nil {
return nil, fmt.Errorf("collect referenced files: %w", err)
}
referencedFiles[path.Join("metadata", metadataFileName)] = struct{}{}
for mle := range meta.PreviousFiles() {
referencedFiles[mle.MetadataFile] = struct{}{}
}
normalizedRefs := make(map[string]struct{}, len(referencedFiles))
for ref := range referencedFiles {
normalizedRefs[ref] = struct{}{}
normalizedRefs[normalizeIcebergPath(ref, bucketName, tablePath)] = struct{}{}
}
tableBasePath := path.Join(s3tables.TablesPath, bucketName, tablePath)
safetyThreshold := time.Now().Add(-time.Duration(orphanOlderThanHours) * time.Hour)
var candidates []filerFileEntry
for _, subdir := range []string{"metadata", "data"} {
dirPath := path.Join(tableBasePath, subdir)
fileEntries, err := walkFilerEntries(ctx, filerClient, dirPath)
if err != nil {
glog.V(2).Infof("iceberg maintenance: cannot walk %s: %v", dirPath, err)
continue
}
for _, fe := range fileEntries {
entry := fe.Entry
fullPath := path.Join(fe.Dir, entry.Name)
relPath := strings.TrimPrefix(fullPath, tableBasePath+"/")
if _, isReferenced := normalizedRefs[relPath]; isReferenced {
continue
}
if entry.Attributes == nil {
continue
}
if time.Unix(entry.Attributes.Mtime, 0).After(safetyThreshold) {
continue
}
candidates = append(candidates, fe)
}
}
return candidates, nil
}
// ---------------------------------------------------------------------------
// Operation: Rewrite Manifests
// ---------------------------------------------------------------------------
// rewriteManifests merges small manifests into fewer, larger ones.
func (h *Handler) rewriteManifests(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
config Config,
) (string, map[string]int64, error) {
start := time.Now()
// Load current metadata
meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
return "", nil, fmt.Errorf("load metadata: %w", err)
}
predicate, err := parsePartitionPredicate(config.Where, meta)
if err != nil {
return "", nil, err
}
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return "no current snapshot", nil, nil
}
// Read manifest list
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
if err != nil {
return "", nil, fmt.Errorf("read manifest list: %w", err)
}
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
if err != nil {
return "", nil, fmt.Errorf("parse manifest list: %w", err)
}
// Separate data manifests from delete manifests. Only data manifests
// are candidates for rewriting; delete manifests are carried forward.
var dataManifests []iceberg.ManifestFile
for _, mf := range manifests {
if mf.ManifestContent() == iceberg.ManifestContentData {
dataManifests = append(dataManifests, mf)
}
}
// Collect all entries from data manifests, grouped by partition spec ID
// so we write one merged manifest per spec (required for spec-evolved tables).
type specEntries struct {
specID int32
spec iceberg.PartitionSpec
entries []iceberg.ManifestEntry
}
specMap := make(map[int32]*specEntries)
// Build a lookup from spec ID to PartitionSpec
specByID := specByID(meta)
var carriedDataManifests []iceberg.ManifestFile
var manifestsRewritten int64
for _, mf := range dataManifests {
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return "", nil, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return "", nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
if predicate != nil {
spec, found := specByID[int(mf.PartitionSpecID())]
if !found {
return "", nil, fmt.Errorf("partition spec %d not found in table metadata", mf.PartitionSpecID())
}
allMatch := len(entries) > 0
for _, entry := range entries {
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return "", nil, err
}
if !match {
allMatch = false
break
}
}
if !allMatch {
carriedDataManifests = append(carriedDataManifests, mf)
continue
}
}
sid := mf.PartitionSpecID()
se, ok := specMap[sid]
if !ok {
ps, found := specByID[int(sid)]
if !found {
return "", nil, fmt.Errorf("partition spec %d not found in table metadata", sid)
}
se = &specEntries{specID: sid, spec: ps}
specMap[sid] = se
}
se.entries = append(se.entries, entries...)
manifestsRewritten++
}
if manifestsRewritten < config.MinManifestsToRewrite {
return fmt.Sprintf("only %d data manifests, below threshold of %d", manifestsRewritten, config.MinManifestsToRewrite), nil, nil
}
if len(specMap) == 0 {
return "no data entries to rewrite", nil, nil
}
schema := meta.CurrentSchema()
version := meta.Version()
snapshotID := currentSnap.SnapshotID
newSnapshotID := time.Now().UnixMilli()
newSeqNum := currentSnap.SequenceNumber + 1
artifactSuffix := compactRandomSuffix()
metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata")
// Track written artifacts so we can clean them up if the commit fails.
type artifact struct {
dir, fileName string
}
var writtenArtifacts []artifact
committed := false
defer func() {
if committed || len(writtenArtifacts) == 0 {
return
}
cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for _, a := range writtenArtifacts {
if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil {
glog.Warningf("iceberg rewrite-manifests: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err)
}
}
}()
// Write one merged manifest per partition spec
var newManifests []iceberg.ManifestFile
newManifests = append(newManifests, carriedDataManifests...)
totalEntries := 0
for _, se := range specMap {
totalEntries += len(se.entries)
manifestFileName := fmt.Sprintf("merged-%d-%s-spec%d.avro", newSnapshotID, artifactSuffix, se.specID)
manifestPath := path.Join("metadata", manifestFileName)
var manifestBuf bytes.Buffer
mergedManifest, err := iceberg.WriteManifest(
manifestPath,
&manifestBuf,
version,
se.spec,
schema,
newSnapshotID,
se.entries,
)
if err != nil {
return "", nil, fmt.Errorf("write merged manifest for spec %d: %w", se.specID, err)
}
if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil {
return "", nil, fmt.Errorf("save merged manifest for spec %d: %w", se.specID, err)
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName})
newManifests = append(newManifests, mergedManifest)
}
// Include any delete manifests that were not rewritten
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
newManifests = append(newManifests, mf)
}
}
var manifestListBuf bytes.Buffer
err = iceberg.WriteManifestList(version, &manifestListBuf, newSnapshotID, &snapshotID, &newSeqNum, 0, newManifests)
if err != nil {
return "", nil, fmt.Errorf("write manifest list: %w", err)
}
// Save new manifest list
manifestListFileName := fmt.Sprintf("snap-%d-%s.avro", newSnapshotID, artifactSuffix)
if err := saveFilerFile(ctx, filerClient, metaDir, manifestListFileName, manifestListBuf.Bytes()); err != nil {
return "", nil, fmt.Errorf("save manifest list: %w", err)
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListFileName})
// Create new snapshot with the rewritten manifest list
manifestListLocation := path.Join("metadata", manifestListFileName)
err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error {
// Guard: verify table head hasn't advanced since we planned.
// The merged manifest and manifest list were built against snapshotID;
// if the head moved, they reference stale state.
cs := currentMeta.CurrentSnapshot()
if cs == nil || cs.SnapshotID != snapshotID {
return errStalePlan
}
newSnapshot := &table.Snapshot{
SnapshotID: newSnapshotID,
ParentSnapshotID: &snapshotID,
SequenceNumber: cs.SequenceNumber + 1,
TimestampMs: time.Now().UnixMilli(),
ManifestList: manifestListLocation,
Summary: &table.Summary{
Operation: table.OpReplace,
Properties: map[string]string{"maintenance": "rewrite_manifests"},
},
SchemaID: func() *int {
id := schema.ID
return &id
}(),
}
if err := builder.AddSnapshot(newSnapshot); err != nil {
return err
}
return builder.SetSnapshotRef(
table.MainBranch,
newSnapshotID,
table.BranchRef,
)
})
if err != nil {
return "", nil, fmt.Errorf("commit manifest rewrite: %w", err)
}
committed = true
metrics := map[string]int64{
MetricManifestsRewritten: manifestsRewritten,
MetricEntriesTotal: int64(totalEntries),
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", manifestsRewritten, len(specMap), totalEntries), metrics, nil
}
// ---------------------------------------------------------------------------
// Commit Protocol with Retry
// ---------------------------------------------------------------------------
// commitWithRetry implements optimistic concurrency for metadata updates.
// It reads the current metadata, applies the mutation, writes a new metadata
// file, and updates the table entry. On version conflict, it retries.
func (h *Handler) commitWithRetry(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath, currentMetadataFileName string,
config Config,
mutate func(currentMeta table.Metadata, builder *table.MetadataBuilder) error,
) error {
maxRetries := config.MaxCommitRetries
if maxRetries <= 0 || maxRetries > 20 {
maxRetries = defaultMaxCommitRetries
}
for attempt := int64(0); attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(50*(1<<(attempt-1))) * time.Millisecond // exponential: 50ms, 100ms, 200ms, ...
const maxBackoff = 5 * time.Second
if backoff > maxBackoff {
backoff = maxBackoff
}
jitter := time.Duration(rand.Int64N(int64(backoff) / 5)) // 020% of backoff
timer := time.NewTimer(backoff + jitter)
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return ctx.Err()
}
}
// Load current metadata
meta, metaFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
return fmt.Errorf("load metadata (attempt %d): %w", attempt, err)
}
// Build new metadata — pass the current metadata file path so the
// metadata log correctly records where the previous version lives.
currentMetaFilePath := path.Join("metadata", metaFileName)
builder, err := table.MetadataBuilderFromBase(meta, currentMetaFilePath)
if err != nil {
return fmt.Errorf("create metadata builder (attempt %d): %w", attempt, err)
}
// Apply the mutation
if err := mutate(meta, builder); err != nil {
return fmt.Errorf("apply mutation (attempt %d): %w", attempt, err)
}
if !builder.HasChanges() {
return nil // nothing to commit
}
newMeta, err := builder.Build()
if err != nil {
return fmt.Errorf("build metadata (attempt %d): %w", attempt, err)
}
// Serialize
metadataBytes, err := json.Marshal(newMeta)
if err != nil {
return fmt.Errorf("marshal metadata (attempt %d): %w", attempt, err)
}
// Determine new metadata file name. Include a timestamp suffix so
// concurrent writers stage to distinct files instead of clobbering.
currentVersion := extractMetadataVersion(metaFileName)
newVersion := currentVersion + 1
newMetadataFileName := fmt.Sprintf("v%d-%d.metadata.json", newVersion, time.Now().UnixNano())
// Save new metadata file
metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata")
if err := saveFilerFile(ctx, filerClient, metaDir, newMetadataFileName, metadataBytes); err != nil {
return fmt.Errorf("save metadata file (attempt %d): %w", attempt, err)
}
// Update the table entry's xattr with new metadata (CAS on version)
tableDir := path.Join(s3tables.TablesPath, bucketName, tablePath)
newMetadataLocation := path.Join("metadata", newMetadataFileName)
err = updateTableMetadataXattr(ctx, filerClient, tableDir, currentVersion, metadataBytes, newMetadataLocation)
if err != nil {
// Use a detached context for cleanup so staged files are removed
// even if the original context was canceled.
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 10*time.Second)
if !errors.Is(err, errMetadataVersionConflict) {
// Non-conflict error (permissions, transport, etc.): fail immediately.
_ = deleteFilerFile(cleanupCtx, filerClient, metaDir, newMetadataFileName)
cleanupCancel()
return fmt.Errorf("update table xattr (attempt %d): %w", attempt, err)
}
// Version conflict: clean up the new metadata file and retry
_ = deleteFilerFile(cleanupCtx, filerClient, metaDir, newMetadataFileName)
cleanupCancel()
if attempt < maxRetries-1 {
glog.V(1).Infof("iceberg maintenance: version conflict on %s/%s, retrying (attempt %d)", bucketName, tablePath, attempt)
continue
}
return fmt.Errorf("update table xattr (attempt %d): %w", attempt, err)
}
return nil
}
return fmt.Errorf("exceeded max commit retries (%d)", maxRetries)
}