iceberg: cache detection planning results (#8667)
* iceberg: cache detection planning results * iceberg: tighten planning index cache handling * iceberg: remove dead planning-index metadata fallback * iceberg: preserve partial planning index caches * iceberg: scope planning index caching per op * iceberg: rename copy vars to avoid shadowing builtin
This commit is contained in:
@@ -3,7 +3,6 @@ package iceberg
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
@@ -121,30 +120,14 @@ func (h *Handler) scanTablesForMaintenance(
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse the internal metadata to get FullMetadata
|
||||
var internalMeta struct {
|
||||
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||
Metadata *struct {
|
||||
FullMetadata json.RawMessage `json:"fullMetadata,omitempty"`
|
||||
} `json:"metadata,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(metadataBytes, &internalMeta); err != nil {
|
||||
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse metadata: %v", bucketName, nsName, tblName, err)
|
||||
continue
|
||||
}
|
||||
if internalMeta.Metadata == nil || len(internalMeta.Metadata.FullMetadata) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
icebergMeta, err := table.ParseMetadataBytes(internalMeta.Metadata.FullMetadata)
|
||||
icebergMeta, metadataFileName, planningIndex, err := parseTableMetadataEnvelope(metadataBytes)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse iceberg metadata: %v", bucketName, nsName, tblName, err)
|
||||
continue
|
||||
}
|
||||
|
||||
tablePath := path.Join(nsName, tblName)
|
||||
metadataFileName := metadataFileNameFromLocation(internalMeta.MetadataLocation, bucketName, tablePath)
|
||||
needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, config, ops)
|
||||
needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, planningIndex, config, ops)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot evaluate maintenance need: %v", bucketName, nsName, tblName, err)
|
||||
continue
|
||||
@@ -192,6 +175,7 @@ func (h *Handler) tableNeedsMaintenance(
|
||||
bucketName, tablePath string,
|
||||
meta table.Metadata,
|
||||
metadataFileName string,
|
||||
cachedPlanningIndex *planningIndex,
|
||||
config Config,
|
||||
ops []string,
|
||||
) (bool, error) {
|
||||
@@ -205,22 +189,66 @@ func (h *Handler) tableNeedsMaintenance(
|
||||
}
|
||||
}
|
||||
|
||||
loadManifests := func() ([]iceberg.ManifestFile, error) {
|
||||
return loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta)
|
||||
}
|
||||
|
||||
var currentManifests []iceberg.ManifestFile
|
||||
var manifestsErr error
|
||||
manifestsLoaded := false
|
||||
var manifestsLoaded bool
|
||||
getCurrentManifests := func() ([]iceberg.ManifestFile, error) {
|
||||
if manifestsLoaded {
|
||||
return currentManifests, manifestsErr
|
||||
}
|
||||
currentManifests, manifestsErr = loadManifests()
|
||||
currentManifests, manifestsErr = loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta)
|
||||
manifestsLoaded = true
|
||||
return currentManifests, manifestsErr
|
||||
}
|
||||
|
||||
computedPlanningIndexes := make(map[string]*planningIndex)
|
||||
planningIndexLoaded := make(map[string]bool)
|
||||
planningIndexErrs := make(map[string]error)
|
||||
getPlanningIndex := func(op string) (*planningIndex, error) {
|
||||
if planningIndexLoaded[op] {
|
||||
return computedPlanningIndexes[op], planningIndexErrs[op]
|
||||
}
|
||||
planningIndexLoaded[op] = true
|
||||
|
||||
manifests, err := getCurrentManifests()
|
||||
if err != nil {
|
||||
planningIndexErrs[op] = err
|
||||
return nil, err
|
||||
}
|
||||
index, err := buildPlanningIndexFromManifests(ctx, filerClient, bucketName, tablePath, meta, config, []string{op}, manifests)
|
||||
if err != nil {
|
||||
planningIndexErrs[op] = err
|
||||
return nil, err
|
||||
}
|
||||
computedPlanningIndexes[op] = index
|
||||
if index != nil {
|
||||
if err := persistPlanningIndex(ctx, filerClient, bucketName, tablePath, index); err != nil {
|
||||
glog.V(2).Infof("iceberg maintenance: unable to persist planning index for %s/%s: %v", bucketName, tablePath, err)
|
||||
}
|
||||
}
|
||||
return index, nil
|
||||
}
|
||||
checkPlanningIndex := func(op string, eligibleFn func(*planningIndex, Config) (bool, bool)) (bool, error) {
|
||||
if cachedPlanningIndex != nil && cachedPlanningIndex.matchesSnapshot(meta) {
|
||||
if eligible, ok := eligibleFn(cachedPlanningIndex, config); ok {
|
||||
return eligible, nil
|
||||
}
|
||||
}
|
||||
|
||||
index, err := getPlanningIndex(op)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if index == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
eligible, _ := eligibleFn(index, config)
|
||||
return eligible, nil
|
||||
}
|
||||
|
||||
var opEvalErrors []string
|
||||
planningIndexErrorReported := false
|
||||
|
||||
for _, op := range ops {
|
||||
switch op {
|
||||
@@ -228,26 +256,27 @@ func (h *Handler) tableNeedsMaintenance(
|
||||
// Handled by the metadata-only check above.
|
||||
continue
|
||||
case "compact":
|
||||
manifests, err := getCurrentManifests()
|
||||
eligible, err := checkPlanningIndex(op, (*planningIndex).compactionEligible)
|
||||
if err != nil {
|
||||
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
|
||||
continue
|
||||
}
|
||||
eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config)
|
||||
if err != nil {
|
||||
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
|
||||
if !planningIndexErrorReported {
|
||||
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
|
||||
planningIndexErrorReported = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
if eligible {
|
||||
return true, nil
|
||||
}
|
||||
case "rewrite_manifests":
|
||||
manifests, err := getCurrentManifests()
|
||||
eligible, err := checkPlanningIndex(op, (*planningIndex).rewriteManifestsEligible)
|
||||
if err != nil {
|
||||
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
|
||||
if !planningIndexErrorReported {
|
||||
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
|
||||
planningIndexErrorReported = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
if countDataManifests(manifests) >= config.MinManifestsToRewrite {
|
||||
if eligible {
|
||||
return true, nil
|
||||
}
|
||||
case "remove_orphans":
|
||||
|
||||
@@ -1255,6 +1255,349 @@ func TestDetectSchedulesManifestRewriteWithoutSnapshotPressure(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetectUsesPlanningIndexForRepeatedCompactionScans(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
setup := tableSetup{
|
||||
BucketName: "test-bucket",
|
||||
Namespace: "analytics",
|
||||
TableName: "events",
|
||||
Snapshots: []table.Snapshot{
|
||||
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
|
||||
},
|
||||
}
|
||||
meta := populateTable(t, fs, setup)
|
||||
writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{
|
||||
makeManifestEntries(t, []testEntrySpec{
|
||||
{path: "data/small-1.parquet", size: 1024, partition: map[int]any{}},
|
||||
{path: "data/small-2.parquet", size: 1024, partition: map[int]any{}},
|
||||
{path: "data/small-3.parquet", size: 1024, partition: map[int]any{}},
|
||||
}, 1),
|
||||
})
|
||||
|
||||
handler := NewHandler(nil)
|
||||
config := Config{
|
||||
TargetFileSizeBytes: 4096,
|
||||
MinInputFiles: 2,
|
||||
Operations: "compact",
|
||||
}
|
||||
|
||||
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("scanTablesForMaintenance failed: %v", err)
|
||||
}
|
||||
if len(tables) != 1 {
|
||||
t.Fatalf("expected 1 compaction candidate, got %d", len(tables))
|
||||
}
|
||||
|
||||
tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.Namespace)
|
||||
tableEntry := fs.getEntry(tableDir, setup.TableName)
|
||||
if tableEntry == nil {
|
||||
t.Fatal("table entry not found")
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
PlanningIndex *planningIndex `json:"planningIndex,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(tableEntry.Extended[s3tables.ExtendedKeyMetadata], &envelope); err != nil {
|
||||
t.Fatalf("parse table metadata xattr: %v", err)
|
||||
}
|
||||
if envelope.PlanningIndex == nil || envelope.PlanningIndex.Compaction == nil {
|
||||
t.Fatal("expected persisted compaction planning index after first scan")
|
||||
}
|
||||
|
||||
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
|
||||
fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{
|
||||
Name: "snap-1.avro",
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
Mtime: time.Now().Unix(),
|
||||
FileSize: uint64(len("broken")),
|
||||
},
|
||||
Content: []byte("broken"),
|
||||
})
|
||||
|
||||
tables, err = handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("scanTablesForMaintenance with cached planning index failed: %v", err)
|
||||
}
|
||||
if len(tables) != 1 {
|
||||
t.Fatalf("expected cached planning index to preserve 1 compaction candidate, got %d", len(tables))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetectInvalidatesPlanningIndexWhenCompactionConfigChanges(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
setup := tableSetup{
|
||||
BucketName: "test-bucket",
|
||||
Namespace: "analytics",
|
||||
TableName: "events",
|
||||
Snapshots: []table.Snapshot{
|
||||
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
|
||||
},
|
||||
}
|
||||
meta := populateTable(t, fs, setup)
|
||||
writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{
|
||||
makeManifestEntries(t, []testEntrySpec{
|
||||
{path: "data/small-1.parquet", size: 1024, partition: map[int]any{}},
|
||||
{path: "data/small-2.parquet", size: 1024, partition: map[int]any{}},
|
||||
}, 1),
|
||||
})
|
||||
|
||||
handler := NewHandler(nil)
|
||||
initialConfig := Config{
|
||||
TargetFileSizeBytes: 4096,
|
||||
MinInputFiles: 3,
|
||||
Operations: "compact",
|
||||
}
|
||||
|
||||
tables, err := handler.scanTablesForMaintenance(context.Background(), client, initialConfig, "", "", "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("initial scanTablesForMaintenance failed: %v", err)
|
||||
}
|
||||
if len(tables) != 0 {
|
||||
t.Fatalf("expected no compaction candidates with min_input_files=3, got %d", len(tables))
|
||||
}
|
||||
|
||||
updatedConfig := Config{
|
||||
TargetFileSizeBytes: 4096,
|
||||
MinInputFiles: 2,
|
||||
Operations: "compact",
|
||||
}
|
||||
|
||||
tables, err = handler.scanTablesForMaintenance(context.Background(), client, updatedConfig, "", "", "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("updated scanTablesForMaintenance failed: %v", err)
|
||||
}
|
||||
if len(tables) != 1 {
|
||||
t.Fatalf("expected planning index invalidation to yield 1 compaction candidate, got %d", len(tables))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetectPlanningIndexPreservesUnscannedSections(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
setup := tableSetup{
|
||||
BucketName: "test-bucket",
|
||||
Namespace: "analytics",
|
||||
TableName: "events",
|
||||
Snapshots: []table.Snapshot{
|
||||
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
|
||||
},
|
||||
}
|
||||
meta := populateTable(t, fs, setup)
|
||||
writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{
|
||||
makeManifestEntries(t, []testEntrySpec{
|
||||
{path: "data/small-1.parquet", size: 1024, partition: map[int]any{}},
|
||||
{path: "data/small-2.parquet", size: 1024, partition: map[int]any{}},
|
||||
}, 1),
|
||||
})
|
||||
|
||||
handler := NewHandler(nil)
|
||||
compactConfig := Config{
|
||||
TargetFileSizeBytes: 4096,
|
||||
MinInputFiles: 2,
|
||||
Operations: "compact",
|
||||
}
|
||||
if _, err := handler.scanTablesForMaintenance(context.Background(), client, compactConfig, "", "", "", 0); err != nil {
|
||||
t.Fatalf("compact scanTablesForMaintenance failed: %v", err)
|
||||
}
|
||||
|
||||
rewriteConfig := Config{
|
||||
MinManifestsToRewrite: 5,
|
||||
Operations: "rewrite_manifests",
|
||||
}
|
||||
if _, err := handler.scanTablesForMaintenance(context.Background(), client, rewriteConfig, "", "", "", 0); err != nil {
|
||||
t.Fatalf("rewrite scanTablesForMaintenance failed: %v", err)
|
||||
}
|
||||
|
||||
tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.Namespace)
|
||||
tableEntry := fs.getEntry(tableDir, setup.TableName)
|
||||
if tableEntry == nil {
|
||||
t.Fatal("table entry not found")
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
PlanningIndex *planningIndex `json:"planningIndex,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(tableEntry.Extended[s3tables.ExtendedKeyMetadata], &envelope); err != nil {
|
||||
t.Fatalf("parse table metadata xattr: %v", err)
|
||||
}
|
||||
if envelope.PlanningIndex == nil {
|
||||
t.Fatal("expected persisted planning index")
|
||||
}
|
||||
if envelope.PlanningIndex.Compaction == nil {
|
||||
t.Fatal("expected compaction section to be preserved")
|
||||
}
|
||||
if envelope.PlanningIndex.RewriteManifests == nil {
|
||||
t.Fatal("expected rewrite_manifests section to be added")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTableNeedsMaintenanceCachesPlanningIndexBuildError(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
setup := tableSetup{
|
||||
BucketName: "test-bucket",
|
||||
Namespace: "analytics",
|
||||
TableName: "events",
|
||||
Snapshots: []table.Snapshot{
|
||||
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
|
||||
},
|
||||
}
|
||||
meta := populateTable(t, fs, setup)
|
||||
writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{
|
||||
makeManifestEntries(t, []testEntrySpec{
|
||||
{path: "data/small-1.parquet", size: 1024, partition: map[int]any{}},
|
||||
{path: "data/small-2.parquet", size: 1024, partition: map[int]any{}},
|
||||
}, 1),
|
||||
})
|
||||
|
||||
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
|
||||
fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{
|
||||
Name: "snap-1.avro",
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
Mtime: time.Now().Unix(),
|
||||
FileSize: uint64(len("broken")),
|
||||
},
|
||||
Content: []byte("broken"),
|
||||
})
|
||||
|
||||
handler := NewHandler(nil)
|
||||
config := Config{
|
||||
TargetFileSizeBytes: 4096,
|
||||
MinInputFiles: 2,
|
||||
MinManifestsToRewrite: 2,
|
||||
Operations: "compact,rewrite_manifests",
|
||||
}
|
||||
ops, err := parseOperations(config.Operations)
|
||||
if err != nil {
|
||||
t.Fatalf("parseOperations: %v", err)
|
||||
}
|
||||
|
||||
needsWork, err := handler.tableNeedsMaintenance(context.Background(), client, setup.BucketName, setup.tablePath(), meta, "v1.metadata.json", nil, config, ops)
|
||||
if err == nil {
|
||||
t.Fatal("expected planning-index build error")
|
||||
}
|
||||
if needsWork {
|
||||
t.Fatal("expected no maintenance result on planning-index build error")
|
||||
}
|
||||
if strings.Count(err.Error(), "parse manifest list") != 1 {
|
||||
t.Fatalf("expected planning-index build error to be reported once, got %q", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTableNeedsMaintenanceScopesPlanningIndexBuildErrorsPerOperation(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
setup := tableSetup{
|
||||
BucketName: "test-bucket",
|
||||
Namespace: "analytics",
|
||||
TableName: "events",
|
||||
Snapshots: []table.Snapshot{
|
||||
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
|
||||
},
|
||||
}
|
||||
meta := populateTable(t, fs, setup)
|
||||
writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{
|
||||
makeManifestEntries(t, []testEntrySpec{
|
||||
{path: "data/small-1.parquet", size: 1024, partition: map[int]any{}},
|
||||
}, 1),
|
||||
makeManifestEntries(t, []testEntrySpec{
|
||||
{path: "data/small-2.parquet", size: 1024, partition: map[int]any{}},
|
||||
}, 1),
|
||||
})
|
||||
|
||||
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
|
||||
fs.putEntry(metaDir, "detect-manifest-1.avro", &filer_pb.Entry{
|
||||
Name: "detect-manifest-1.avro",
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
Mtime: time.Now().Unix(),
|
||||
FileSize: uint64(len("broken")),
|
||||
},
|
||||
Content: []byte("broken"),
|
||||
})
|
||||
|
||||
handler := NewHandler(nil)
|
||||
config := Config{
|
||||
TargetFileSizeBytes: 4096,
|
||||
MinInputFiles: 2,
|
||||
MinManifestsToRewrite: 2,
|
||||
Operations: "compact,rewrite_manifests",
|
||||
}
|
||||
ops, err := parseOperations(config.Operations)
|
||||
if err != nil {
|
||||
t.Fatalf("parseOperations: %v", err)
|
||||
}
|
||||
|
||||
needsWork, err := handler.tableNeedsMaintenance(context.Background(), client, setup.BucketName, setup.tablePath(), meta, "v1.metadata.json", nil, config, ops)
|
||||
if err != nil {
|
||||
t.Fatalf("expected rewrite_manifests planning to survive compaction planning error, got %v", err)
|
||||
}
|
||||
if !needsWork {
|
||||
t.Fatal("expected rewrite_manifests maintenance despite compaction planning error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistPlanningIndexUsesMetadataXattrCASGuard(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
setup := tableSetup{
|
||||
BucketName: "test-bucket",
|
||||
Namespace: "analytics",
|
||||
TableName: "events",
|
||||
Snapshots: []table.Snapshot{
|
||||
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
|
||||
},
|
||||
}
|
||||
populateTable(t, fs, setup)
|
||||
|
||||
tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath())
|
||||
tableEntry := fs.getEntry(path.Dir(tableDir), path.Base(tableDir))
|
||||
if tableEntry == nil {
|
||||
t.Fatal("table entry not found")
|
||||
}
|
||||
|
||||
observedExpectedExtended := make(chan map[string][]byte, 1)
|
||||
fs.beforeUpdate = func(_ *fakeFilerServer, req *filer_pb.UpdateEntryRequest) error {
|
||||
cloned := make(map[string][]byte, len(req.ExpectedExtended))
|
||||
for key, value := range req.ExpectedExtended {
|
||||
cloned[key] = append([]byte(nil), value...)
|
||||
}
|
||||
observedExpectedExtended <- cloned
|
||||
return nil
|
||||
}
|
||||
|
||||
err := persistPlanningIndex(context.Background(), client, setup.BucketName, setup.tablePath(), &planningIndex{
|
||||
SnapshotID: 1,
|
||||
ManifestList: "metadata/snap-1.avro",
|
||||
UpdatedAtMs: time.Now().UnixMilli(),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("persistPlanningIndex: %v", err)
|
||||
}
|
||||
|
||||
var expectedExtended map[string][]byte
|
||||
select {
|
||||
case expectedExtended = <-observedExpectedExtended:
|
||||
default:
|
||||
t.Fatal("expected persistPlanningIndex to issue an UpdateEntry request")
|
||||
}
|
||||
|
||||
if got := expectedExtended[s3tables.ExtendedKeyMetadata]; !bytes.Equal(got, tableEntry.Extended[s3tables.ExtendedKeyMetadata]) {
|
||||
t.Fatal("expected metadata xattr to be included in ExpectedExtended")
|
||||
}
|
||||
if got := expectedExtended[s3tables.ExtendedKeyMetadataVersion]; !bytes.Equal(got, tableEntry.Extended[s3tables.ExtendedKeyMetadataVersion]) {
|
||||
t.Fatal("expected metadata version xattr to be preserved in ExpectedExtended")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetectDoesNotScheduleManifestRewriteFromDeleteManifestsOnly(t *testing.T) {
|
||||
fs, client := startFakeFiler(t)
|
||||
|
||||
|
||||
269
weed/plugin/worker/iceberg/planning_index.go
Normal file
269
weed/plugin/worker/iceberg/planning_index.go
Normal file
@@ -0,0 +1,269 @@
|
||||
package iceberg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/apache/iceberg-go"
|
||||
"github.com/apache/iceberg-go/table"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type planningIndex struct {
|
||||
SnapshotID int64 `json:"snapshotId"`
|
||||
ManifestList string `json:"manifestList,omitempty"`
|
||||
UpdatedAtMs int64 `json:"updatedAtMs"`
|
||||
DataManifestCount int64 `json:"dataManifestCount,omitempty"`
|
||||
Compaction *planningIndexCompaction `json:"compaction,omitempty"`
|
||||
RewriteManifests *planningIndexRewriteManifests `json:"rewriteManifests,omitempty"`
|
||||
}
|
||||
|
||||
type planningIndexCompaction struct {
|
||||
ConfigHash string `json:"configHash"`
|
||||
Eligible bool `json:"eligible"`
|
||||
}
|
||||
|
||||
type planningIndexRewriteManifests struct {
|
||||
Threshold int64 `json:"threshold"`
|
||||
Eligible bool `json:"eligible"`
|
||||
}
|
||||
|
||||
type tableMetadataEnvelope struct {
|
||||
MetadataVersion int `json:"metadataVersion"`
|
||||
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||
Metadata *struct {
|
||||
FullMetadata json.RawMessage `json:"fullMetadata,omitempty"`
|
||||
} `json:"metadata,omitempty"`
|
||||
PlanningIndex json.RawMessage `json:"planningIndex,omitempty"`
|
||||
}
|
||||
|
||||
func parseTableMetadataEnvelope(metadataBytes []byte) (table.Metadata, string, *planningIndex, error) {
|
||||
var envelope tableMetadataEnvelope
|
||||
if err := json.Unmarshal(metadataBytes, &envelope); err != nil {
|
||||
return nil, "", nil, fmt.Errorf("parse metadata xattr: %w", err)
|
||||
}
|
||||
if envelope.Metadata == nil || len(envelope.Metadata.FullMetadata) == 0 {
|
||||
return nil, "", nil, fmt.Errorf("no fullMetadata in table xattr")
|
||||
}
|
||||
|
||||
meta, err := table.ParseMetadataBytes(envelope.Metadata.FullMetadata)
|
||||
if err != nil {
|
||||
return nil, "", nil, fmt.Errorf("parse iceberg metadata: %w", err)
|
||||
}
|
||||
|
||||
var index *planningIndex
|
||||
if len(envelope.PlanningIndex) > 0 {
|
||||
if err := json.Unmarshal(envelope.PlanningIndex, &index); err != nil {
|
||||
glog.V(2).Infof("iceberg maintenance: ignoring invalid planning index cache: %v", err)
|
||||
index = nil
|
||||
}
|
||||
}
|
||||
|
||||
metadataFileName := metadataFileNameFromLocation(envelope.MetadataLocation, "", "")
|
||||
if metadataFileName == "" {
|
||||
metadataFileName = fmt.Sprintf("v%d.metadata.json", envelope.MetadataVersion)
|
||||
}
|
||||
return meta, metadataFileName, index, nil
|
||||
}
|
||||
|
||||
func (idx *planningIndex) matchesSnapshot(meta table.Metadata) bool {
|
||||
if idx == nil {
|
||||
return false
|
||||
}
|
||||
currentSnap := meta.CurrentSnapshot()
|
||||
if currentSnap == nil || currentSnap.ManifestList == "" {
|
||||
return false
|
||||
}
|
||||
return idx.SnapshotID == currentSnap.SnapshotID && idx.ManifestList == currentSnap.ManifestList
|
||||
}
|
||||
|
||||
func (idx *planningIndex) compactionEligible(config Config) (bool, bool) {
|
||||
if idx == nil || idx.Compaction == nil {
|
||||
return false, false
|
||||
}
|
||||
if idx.Compaction.ConfigHash != compactionPlanningConfigHash(config) {
|
||||
return false, false
|
||||
}
|
||||
return idx.Compaction.Eligible, true
|
||||
}
|
||||
|
||||
func (idx *planningIndex) rewriteManifestsEligible(config Config) (bool, bool) {
|
||||
if idx == nil || idx.RewriteManifests == nil {
|
||||
return false, false
|
||||
}
|
||||
if idx.RewriteManifests.Threshold != config.MinManifestsToRewrite {
|
||||
return false, false
|
||||
}
|
||||
return idx.RewriteManifests.Eligible, true
|
||||
}
|
||||
|
||||
func compactionPlanningConfigHash(config Config) string {
|
||||
return fmt.Sprintf("target=%d|min=%d", config.TargetFileSizeBytes, config.MinInputFiles)
|
||||
}
|
||||
|
||||
func operationRequested(ops []string, wanted string) bool {
|
||||
for _, op := range ops {
|
||||
if op == wanted {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func mergePlanningIndexSections(index, existing *planningIndex) *planningIndex {
|
||||
if index == nil || existing == nil {
|
||||
return index
|
||||
}
|
||||
if index.SnapshotID != existing.SnapshotID || index.ManifestList != existing.ManifestList {
|
||||
return index
|
||||
}
|
||||
if index.Compaction == nil && existing.Compaction != nil {
|
||||
compactionCopy := *existing.Compaction
|
||||
index.Compaction = &compactionCopy
|
||||
}
|
||||
if index.RewriteManifests == nil && existing.RewriteManifests != nil {
|
||||
rewriteCopy := *existing.RewriteManifests
|
||||
index.RewriteManifests = &rewriteCopy
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
func buildPlanningIndex(
|
||||
ctx context.Context,
|
||||
filerClient filer_pb.SeaweedFilerClient,
|
||||
bucketName, tablePath string,
|
||||
meta table.Metadata,
|
||||
config Config,
|
||||
ops []string,
|
||||
) (*planningIndex, error) {
|
||||
currentSnap := meta.CurrentSnapshot()
|
||||
if currentSnap == nil || currentSnap.ManifestList == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
manifests, err := loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buildPlanningIndexFromManifests(ctx, filerClient, bucketName, tablePath, meta, config, ops, manifests)
|
||||
}
|
||||
|
||||
func buildPlanningIndexFromManifests(
|
||||
ctx context.Context,
|
||||
filerClient filer_pb.SeaweedFilerClient,
|
||||
bucketName, tablePath string,
|
||||
meta table.Metadata,
|
||||
config Config,
|
||||
ops []string,
|
||||
manifests []iceberg.ManifestFile,
|
||||
) (*planningIndex, error) {
|
||||
currentSnap := meta.CurrentSnapshot()
|
||||
if currentSnap == nil || currentSnap.ManifestList == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
index := &planningIndex{
|
||||
SnapshotID: currentSnap.SnapshotID,
|
||||
ManifestList: currentSnap.ManifestList,
|
||||
UpdatedAtMs: time.Now().UnixMilli(),
|
||||
DataManifestCount: countDataManifests(manifests),
|
||||
}
|
||||
|
||||
if operationRequested(ops, "compact") {
|
||||
eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index.Compaction = &planningIndexCompaction{
|
||||
ConfigHash: compactionPlanningConfigHash(config),
|
||||
Eligible: eligible,
|
||||
}
|
||||
}
|
||||
|
||||
if operationRequested(ops, "rewrite_manifests") {
|
||||
index.RewriteManifests = &planningIndexRewriteManifests{
|
||||
Threshold: config.MinManifestsToRewrite,
|
||||
Eligible: index.DataManifestCount >= config.MinManifestsToRewrite,
|
||||
}
|
||||
}
|
||||
|
||||
return index, nil
|
||||
}
|
||||
|
||||
func persistPlanningIndex(
|
||||
ctx context.Context,
|
||||
client filer_pb.SeaweedFilerClient,
|
||||
bucketName, tablePath string,
|
||||
index *planningIndex,
|
||||
) error {
|
||||
if index == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
tableDir := path.Join(s3tables.TablesPath, bucketName, tablePath)
|
||||
tableName := path.Base(tableDir)
|
||||
parentDir := path.Dir(tableDir)
|
||||
|
||||
resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: parentDir,
|
||||
Name: tableName,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("lookup table entry: %w", err)
|
||||
}
|
||||
if resp == nil || resp.Entry == nil {
|
||||
return fmt.Errorf("table entry not found")
|
||||
}
|
||||
|
||||
existingXattr, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadata]
|
||||
if !ok || len(existingXattr) == 0 {
|
||||
return fmt.Errorf("no metadata xattr on table entry")
|
||||
}
|
||||
|
||||
var internalMeta map[string]json.RawMessage
|
||||
if err := json.Unmarshal(existingXattr, &internalMeta); err != nil {
|
||||
return fmt.Errorf("unmarshal metadata xattr: %w", err)
|
||||
}
|
||||
if _, _, existingIndex, err := parseTableMetadataEnvelope(existingXattr); err == nil {
|
||||
index = mergePlanningIndexSections(index, existingIndex)
|
||||
}
|
||||
|
||||
indexJSON, err := json.Marshal(index)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal planning index: %w", err)
|
||||
}
|
||||
internalMeta["planningIndex"] = indexJSON
|
||||
|
||||
updatedXattr, err := json.Marshal(internalMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal updated metadata xattr: %w", err)
|
||||
}
|
||||
|
||||
expectedExtended := map[string][]byte{
|
||||
s3tables.ExtendedKeyMetadata: existingXattr,
|
||||
}
|
||||
if expectedVersionXattr, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadataVersion]; ok && len(expectedVersionXattr) > 0 {
|
||||
expectedExtended[s3tables.ExtendedKeyMetadataVersion] = expectedVersionXattr
|
||||
}
|
||||
resp.Entry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr
|
||||
_, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
|
||||
Directory: parentDir,
|
||||
Entry: resp.Entry,
|
||||
ExpectedExtended: expectedExtended,
|
||||
})
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.FailedPrecondition {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("update table entry: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user