diff --git a/test/s3/normal/get_object_attributes_test.go b/test/s3/normal/get_object_attributes_test.go index 95dc7d8f7..a80b24baf 100644 --- a/test/s3/normal/get_object_attributes_test.go +++ b/test/s3/normal/get_object_attributes_test.go @@ -10,14 +10,14 @@ import ( "testing" "time" - "github.com/aws/aws-sdk-go/aws" - v1credentials "github.com/aws/aws-sdk-go/aws/credentials" - v1signer "github.com/aws/aws-sdk-go/aws/signer/v4" - v1s3 "github.com/aws/aws-sdk-go/service/s3" v2aws "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" v2s3 "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/aws-sdk-go/aws" + v1credentials "github.com/aws/aws-sdk-go/aws/credentials" + v1signer "github.com/aws/aws-sdk-go/aws/signer/v4" + v1s3 "github.com/aws/aws-sdk-go/service/s3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -187,8 +187,8 @@ func testGetObjectAttributesMultipart(t *testing.T, cluster *TestCluster) { // Test pagination: MaxParts=1 resp2, err := client.GetObjectAttributes(context.Background(), &v2s3.GetObjectAttributesInput{ - Bucket: v2aws.String(bucketName), - Key: v2aws.String(objectKey), + Bucket: v2aws.String(bucketName), + Key: v2aws.String(objectKey), MaxParts: v2aws.Int32(1), ObjectAttributes: []types.ObjectAttributes{ types.ObjectAttributesObjectParts, diff --git a/test/s3tables/maintenance/maintenance_integration_test.go b/test/s3tables/maintenance/maintenance_integration_test.go new file mode 100644 index 000000000..735cca573 --- /dev/null +++ b/test/s3tables/maintenance/maintenance_integration_test.go @@ -0,0 +1,681 @@ +// Package maintenance contains integration tests for the iceberg table +// maintenance plugin worker. Tests start a real weed mini cluster, create +// tables via the S3 Tables API, populate Iceberg metadata via the filer +// gRPC API, and then exercise the iceberg.Handler operations against the +// live filer. +package maintenance + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "net" + "net/http" + "os" + "path" + "path/filepath" + "strconv" + "sync" + "testing" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/seaweedfs/seaweedfs/weed/command" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + icebergHandler "github.com/seaweedfs/seaweedfs/weed/plugin/worker/iceberg" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" +) + +// --------------------------------------------------------------------------- +// Cluster lifecycle (mirrors test/s3tables/table-buckets pattern) +// --------------------------------------------------------------------------- + +type testCluster struct { + dataDir string + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + filerGrpcPort int + s3Port int + s3Endpoint string + isRunning bool +} + +var shared *testCluster + +func TestMain(m *testing.M) { + flag.Parse() + if testing.Short() { + os.Exit(m.Run()) + } + + testDir, err := os.MkdirTemp("", "seaweed-iceberg-maint-*") + if err != nil { + fmt.Fprintf(os.Stderr, "SKIP: failed to create temp dir: %v\n", err) + os.Exit(0) + } + + cluster, err := startCluster(testDir, nil) + if err != nil { + fmt.Fprintf(os.Stderr, "SKIP: failed to start cluster: %v\n", err) + os.RemoveAll(testDir) + os.Exit(0) + } + shared = cluster + + code := m.Run() + shared.stop() + os.RemoveAll(testDir) + os.Exit(code) +} + +func startCluster(testDir string, extraArgs []string) (*testCluster, error) { + ports, err := findPorts(10) + if err != nil { + return nil, err + } + masterPort, masterGrpc := ports[0], ports[1] + volumePort, volumeGrpc := ports[2], ports[3] + filerPort, filerGrpc := ports[4], ports[5] + s3Port, s3Grpc := ports[6], ports[7] + adminPort, adminGrpc := ports[8], ports[9] + + _ = os.Remove(filepath.Join(testDir, "mini.options")) + + // Empty security.toml disables JWT auth. + if err := os.WriteFile(filepath.Join(testDir, "security.toml"), []byte("# test\n"), 0644); err != nil { + return nil, err + } + if os.Getenv("AWS_ACCESS_KEY_ID") == "" { + os.Setenv("AWS_ACCESS_KEY_ID", "admin") + } + if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { + os.Setenv("AWS_SECRET_ACCESS_KEY", "admin") + } + + ctx, cancel := context.WithCancel(context.Background()) + c := &testCluster{ + dataDir: testDir, + ctx: ctx, + cancel: cancel, + filerGrpcPort: filerGrpc, + s3Port: s3Port, + s3Endpoint: fmt.Sprintf("http://127.0.0.1:%d", s3Port), + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + oldDir, _ := os.Getwd() + oldArgs := os.Args + defer func() { os.Chdir(oldDir); os.Args = oldArgs }() + os.Chdir(testDir) + + args := []string{ + "-dir=" + testDir, + "-master.dir=" + testDir, + "-master.port=" + strconv.Itoa(masterPort), + "-master.port.grpc=" + strconv.Itoa(masterGrpc), + "-volume.port=" + strconv.Itoa(volumePort), + "-volume.port.grpc=" + strconv.Itoa(volumeGrpc), + "-volume.port.public=" + strconv.Itoa(volumePort), + "-volume.publicUrl=127.0.0.1:" + strconv.Itoa(volumePort), + "-filer.port=" + strconv.Itoa(filerPort), + "-filer.port.grpc=" + strconv.Itoa(filerGrpc), + "-s3.port=" + strconv.Itoa(s3Port), + "-s3.port.grpc=" + strconv.Itoa(s3Grpc), + "-admin.port=" + strconv.Itoa(adminPort), + "-admin.port.grpc=" + strconv.Itoa(adminGrpc), + "-webdav.port=0", + "-admin.ui=false", + "-master.volumeSizeLimitMB=32", + "-ip=127.0.0.1", + "-master.peers=none", + "-s3.iam.readOnly=false", + } + args = append(args, extraArgs...) + os.Args = append([]string{"weed"}, args...) + glog.MaxSize = 1024 * 1024 + for _, cmd := range command.Commands { + if cmd.Name() == "mini" && cmd.Run != nil { + cmd.Flag.Parse(os.Args[1:]) + command.MiniClusterCtx = ctx + cmd.Run(cmd, cmd.Flag.Args()) + command.MiniClusterCtx = nil + return + } + } + }() + + if err := waitReady(c.s3Endpoint, 30*time.Second); err != nil { + cancel() + return nil, err + } + c.isRunning = true + return c, nil +} + +func (c *testCluster) stop() { + if c.cancel != nil { + c.cancel() + } + if c.isRunning { + time.Sleep(500 * time.Millisecond) + } + done := make(chan struct{}) + go func() { c.wg.Wait(); close(done) }() + select { + case <-done: + case <-time.After(2 * time.Second): + } +} + +func (c *testCluster) filerConn(t *testing.T) (*grpc.ClientConn, filer_pb.SeaweedFilerClient) { + t.Helper() + addr := fmt.Sprintf("127.0.0.1:%d", c.filerGrpcPort) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) + return conn, filer_pb.NewSeaweedFilerClient(conn) +} + +func findPorts(n int) ([]int, error) { + ls := make([]*net.TCPListener, n) + ps := make([]int, n) + for i := 0; i < n; i++ { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + for j := 0; j < i; j++ { + ls[j].Close() + } + return nil, err + } + ls[i] = l.(*net.TCPListener) + ps[i] = ls[i].Addr().(*net.TCPAddr).Port + } + for _, l := range ls { + l.Close() + } + return ps, nil +} + +func waitReady(endpoint string, timeout time.Duration) error { + client := &http.Client{Timeout: 1 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(endpoint) + if err == nil { + resp.Body.Close() + time.Sleep(500 * time.Millisecond) + return nil + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("timeout waiting for %s", endpoint) +} + +func randomSuffix() string { + b := make([]byte, 4) + rand.Read(b) + return fmt.Sprintf("%x", b) +} + +// --------------------------------------------------------------------------- +// Helpers for populating Iceberg table state via filer gRPC +// --------------------------------------------------------------------------- + +func newTestSchema() *iceberg.Schema { + return iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Type: iceberg.PrimitiveTypes.Int64, Name: "id", Required: true}, + iceberg.NestedField{ID: 2, Type: iceberg.PrimitiveTypes.String, Name: "name", Required: false}, + ) +} + +func buildMetadata(t *testing.T, snapshots []table.Snapshot) table.Metadata { + t.Helper() + schema := newTestSchema() + meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, table.UnsortedSortOrder, "s3://test/table", nil) + require.NoError(t, err) + if len(snapshots) == 0 { + return meta + } + + // Iceberg validates that snapshot timestamps >= metadata lastUpdatedMs. + // Offset all timestamps so they're safely after the metadata was created. + baseMs := time.Now().UnixMilli() + 100 + for i := range snapshots { + snapshots[i].TimestampMs = baseMs + int64(i) + } + + builder, err := table.MetadataBuilderFromBase(meta, "s3://test/table") + require.NoError(t, err) + var lastID int64 + for i := range snapshots { + s := snapshots[i] + require.NoError(t, builder.AddSnapshot(&s)) + lastID = s.SnapshotID + } + require.NoError(t, builder.SetSnapshotRef(table.MainBranch, lastID, table.BranchRef)) + result, err := builder.Build() + require.NoError(t, err) + return result +} + +// s3put performs an S3-signed PUT request. When key is empty it creates a +// bucket; otherwise it uploads an object. +func s3put(t *testing.T, s3Endpoint, bucket, key string, body []byte) { + t.Helper() + urlPath := "/" + bucket + if key != "" { + urlPath += "/" + key + } + var reader *bytes.Reader + if body != nil { + reader = bytes.NewReader(body) + } else { + reader = bytes.NewReader(nil) + } + req, err := http.NewRequest(http.MethodPut, s3Endpoint+urlPath, reader) + require.NoError(t, err) + req.Header.Set("Host", req.URL.Host) + hash := sha256.Sum256(body) + err = v4.NewSigner().SignHTTP(context.Background(), + aws.Credentials{AccessKeyID: "admin", SecretAccessKey: "admin"}, + req, hex.EncodeToString(hash[:]), "s3", "us-east-1", time.Now()) + require.NoError(t, err) + resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req) + require.NoError(t, err) + defer resp.Body.Close() + if resp.StatusCode >= 400 { + b, _ := io.ReadAll(resp.Body) + require.FailNowf(t, "s3put failed", "s3put(%s/%s) → %d: %s", bucket, key, resp.StatusCode, string(b)) + } +} + +// createBucketViaS3 creates a regular S3 bucket via the S3 PUT Bucket API. +// populateTableViaFiler creates the Iceberg directory structure, metadata xattr, +// manifests, and data files for a table directly in the filer. +// The table bucket, namespace, and table must already exist (created via S3 +// Tables API). +func populateTableViaFiler( + t *testing.T, + client filer_pb.SeaweedFilerClient, + s3Endpoint, bucketName, namespace, tableName string, + snapshots []table.Snapshot, +) table.Metadata { + t.Helper() + ctx := context.Background() + meta := buildMetadata(t, snapshots) + fullJSON, err := json.Marshal(meta) + require.NoError(t, err) + + tablePath := path.Join(namespace, tableName) + bucketsPath := s3tables.TablesPath + tableFilerPath := path.Join(bucketsPath, bucketName, tablePath) + metaDir := path.Join(tableFilerPath, "metadata") + dataDir := path.Join(tableFilerPath, "data") + + // Build the table metadata xattr + internalMeta := map[string]interface{}{ + "metadataVersion": 1, + "metadataLocation": "metadata/v1.metadata.json", + "metadata": map[string]interface{}{ + "fullMetadata": json.RawMessage(fullJSON), + }, + } + xattr, err := json.Marshal(internalMeta) + require.NoError(t, err) + + // Create the S3 bucket via PUT, then create the directory tree via S3 PUT + // object with zero-byte content (the filer creates intermediate dirs). + s3put(t, s3Endpoint, bucketName, "", nil) // create bucket + s3put(t, s3Endpoint, bucketName, namespace+"/.dir", []byte{}) // create ns dir + s3put(t, s3Endpoint, bucketName, tablePath+"/.dir", []byte{}) // create table dir + s3put(t, s3Endpoint, bucketName, tablePath+"/metadata/.dir", []byte{}) // create metadata dir + s3put(t, s3Endpoint, bucketName, tablePath+"/data/.dir", []byte{}) // create data dir + + // Now set the table bucket and table xattrs via filer gRPC. + // Mark bucket as table bucket. + bucketEntry := lookupEntry(t, client, bucketsPath, bucketName) + require.NotNil(t, bucketEntry, "bucket should exist after S3 PUT") + if bucketEntry.Extended == nil { + bucketEntry.Extended = make(map[string][]byte) + } + bucketEntry.Extended[s3tables.ExtendedKeyTableBucket] = []byte("true") + _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: bucketsPath, Entry: bucketEntry, + }) + require.NoError(t, err) + + // Set table metadata xattr. + nsDir := path.Join(bucketsPath, bucketName, namespace) + tableEntry := lookupEntry(t, client, nsDir, tableName) + require.NotNil(t, tableEntry, "table dir should exist after S3 PUT") + if tableEntry.Extended == nil { + tableEntry.Extended = make(map[string][]byte) + } + tableEntry.Extended[s3tables.ExtendedKeyMetadata] = xattr + _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: nsDir, Entry: tableEntry, + }) + require.NoError(t, err) + + // Write manifest + manifest list + data file for each snapshot + schema := meta.CurrentSchema() + spec := meta.PartitionSpec() + version := meta.Version() + + for _, snap := range snapshots { + if snap.ManifestList == "" { + continue + } + snapID := snap.SnapshotID + dataFileName := fmt.Sprintf("snap-%d-data.parquet", snapID) + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, iceberg.EntryContentData, + path.Join("data", dataFileName), iceberg.ParquetFile, + map[int]any{}, nil, nil, 10, 4096, + ) + require.NoError(t, err) + + entry := iceberg.NewManifestEntry( + iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build(), + ) + + // Manifest + manifestName := fmt.Sprintf("manifest-%d.avro", snapID) + var manifestBuf bytes.Buffer + mf, err := iceberg.WriteManifest( + path.Join("metadata", manifestName), &manifestBuf, + version, spec, schema, snapID, []iceberg.ManifestEntry{entry}, + ) + require.NoError(t, err) + writeFile(t, ctx, client, metaDir, manifestName, manifestBuf.Bytes()) + + // Manifest list + mlName := path.Base(snap.ManifestList) + var mlBuf bytes.Buffer + parent := snap.ParentSnapshotID + seqNum := snap.SequenceNumber + require.NoError(t, iceberg.WriteManifestList( + version, &mlBuf, snapID, parent, &seqNum, 0, + []iceberg.ManifestFile{mf}, + )) + writeFile(t, ctx, client, metaDir, mlName, mlBuf.Bytes()) + + // Data file (dummy content) + writeFile(t, ctx, client, dataDir, dataFileName, []byte("fake-parquet")) + } + + // Wait for snapshot timestamps (set slightly in the future by + // buildMetadata) to move into the past so expiration checks work. + time.Sleep(200 * time.Millisecond) + + return meta +} + +func writeFile(t *testing.T, ctx context.Context, client filer_pb.SeaweedFilerClient, dir, name string, content []byte) { + t.Helper() + resp, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), Crtime: time.Now().Unix(), + FileMode: uint32(0644), FileSize: uint64(len(content)), + }, + Content: content, + }, + }) + require.NoError(t, err, "writeFile(%s, %s): rpc error", dir, name) + require.Empty(t, resp.Error, "writeFile(%s, %s): resp error", dir, name) +} + +func lookupEntry(t *testing.T, client filer_pb.SeaweedFilerClient, dir, name string) *filer_pb.Entry { + t.Helper() + resp, err := filer_pb.LookupEntry(context.Background(), client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, Name: name, + }) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + return nil + } + require.NoError(t, err, "lookupEntry(%s, %s): unexpected error", dir, name) + } + return resp.Entry +} + +// --------------------------------------------------------------------------- +// Integration tests +// --------------------------------------------------------------------------- + +func TestIcebergMaintenanceIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + t.Run("ExpireSnapshots", testExpireSnapshots) + t.Run("RemoveOrphans", testRemoveOrphans) + t.Run("RewriteManifests", testRewriteManifests) + t.Run("FullMaintenanceCycle", testFullMaintenanceCycle) +} + +func testExpireSnapshots(t *testing.T) { + _, client := shared.filerConn(t) + suffix := randomSuffix() + bucket := "maint-expire-" + suffix + ns := "ns" + tbl := "tbl" + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + } + populateTableViaFiler(t, client, shared.s3Endpoint, bucket, ns, tbl, snapshots) + + handler := icebergHandler.NewHandler(nil) + config := icebergHandler.Config{ + SnapshotRetentionHours: 0, // instant expiry — everything eligible + MaxSnapshotsToKeep: 1, // keep only the current snapshot + MaxCommitRetries: 3, + } + + result, err := handler.ExpireSnapshots(context.Background(), client, bucket, path.Join(ns, tbl), config) + require.NoError(t, err) + assert.Contains(t, result, "expired") + t.Logf("ExpireSnapshots result: %s", result) + + // Verify metadata was updated + entry := lookupEntry(t, client, path.Join(s3tables.TablesPath, bucket, ns), tbl) + require.NotNil(t, entry) + xattr := entry.Extended[s3tables.ExtendedKeyMetadata] + require.NotEmpty(t, xattr) + + // Parse updated metadata to verify snapshot count + var internalMeta struct { + MetadataVersion int `json:"metadataVersion"` + } + require.NoError(t, json.Unmarshal(xattr, &internalMeta)) + assert.Greater(t, internalMeta.MetadataVersion, 1, "metadata version should have been incremented") +} + +func testRemoveOrphans(t *testing.T) { + _, client := shared.filerConn(t) + suffix := randomSuffix() + bucket := "maint-orphan-" + suffix + ns := "ns" + tbl := "tbl" + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + } + populateTableViaFiler(t, client, shared.s3Endpoint, bucket, ns, tbl, snapshots) + + ctx := context.Background() + tablePath := path.Join(ns, tbl) + tableFilerPath := path.Join(s3tables.TablesPath, bucket, tablePath) + dataDir := path.Join(tableFilerPath, "data") + metaDir := path.Join(tableFilerPath, "metadata") + + // Create orphan files (old enough to be removed) + oldTime := time.Now().Add(-200 * time.Hour).Unix() + writeOrphan := func(dir, name string) { + resp, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + Attributes: &filer_pb.FuseAttributes{ + Mtime: oldTime, Crtime: oldTime, + FileMode: uint32(0644), FileSize: 100, + }, + Content: []byte("orphan"), + }, + }) + require.NoError(t, err, "writeOrphan(%s, %s): rpc error", dir, name) + require.Empty(t, resp.Error, "writeOrphan(%s, %s): resp error", dir, name) + } + writeOrphan(dataDir, "orphan-data.parquet") + writeOrphan(metaDir, "orphan-meta.avro") + + // Create a recent orphan (should NOT be removed) + writeFile(t, ctx, client, dataDir, "recent-orphan.parquet", []byte("new")) + + handler := icebergHandler.NewHandler(nil) + config := icebergHandler.Config{ + OrphanOlderThanHours: 72, + MaxCommitRetries: 3, + } + + result, err := handler.RemoveOrphans(ctx, client, bucket, tablePath, config) + require.NoError(t, err) + assert.Contains(t, result, "removed") + t.Logf("RemoveOrphans result: %s", result) + + // Verify orphans were removed + assert.Nil(t, lookupEntry(t, client, dataDir, "orphan-data.parquet"), "old orphan data file should be deleted") + assert.Nil(t, lookupEntry(t, client, metaDir, "orphan-meta.avro"), "old orphan meta file should be deleted") + + // Verify recent orphan was kept + assert.NotNil(t, lookupEntry(t, client, dataDir, "recent-orphan.parquet"), "recent orphan should be kept") +} + +func testRewriteManifests(t *testing.T) { + _, client := shared.filerConn(t) + suffix := randomSuffix() + bucket := "maint-rewrite-" + suffix + ns := "ns" + tbl := "tbl" + + // Each snapshot gets its own manifest list with one manifest. The current + // snapshot (latest) therefore has only 1 manifest — below the rewrite + // threshold. This tests the threshold check against a real filer. + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + } + populateTableViaFiler(t, client, shared.s3Endpoint, bucket, ns, tbl, snapshots) + + handler := icebergHandler.NewHandler(nil) + config := icebergHandler.Config{ + MinManifestsToRewrite: 5, // threshold higher than 1 manifest + MaxCommitRetries: 3, + } + + tablePath := path.Join(ns, tbl) + result, err := handler.RewriteManifests(context.Background(), client, bucket, tablePath, config) + require.NoError(t, err) + assert.Contains(t, result, "below threshold") + t.Logf("RewriteManifests result: %s", result) +} + +func testFullMaintenanceCycle(t *testing.T) { + _, client := shared.filerConn(t) + suffix := randomSuffix() + bucket := "maint-full-" + suffix + ns := "ns" + tbl := "tbl" + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"}, + {SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"}, + {SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"}, + } + populateTableViaFiler(t, client, shared.s3Endpoint, bucket, ns, tbl, snapshots) + + ctx := context.Background() + tablePath := path.Join(ns, tbl) + + // Add an orphan + dataDir := path.Join(s3tables.TablesPath, bucket, tablePath, "data") + oldTime := time.Now().Add(-200 * time.Hour).Unix() + orphanResp, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: dataDir, + Entry: &filer_pb.Entry{ + Name: "orphan.parquet", + Attributes: &filer_pb.FuseAttributes{ + Mtime: oldTime, Crtime: oldTime, + FileMode: uint32(0644), FileSize: 100, + }, + Content: []byte("orphan"), + }, + }) + require.NoError(t, err) + require.Empty(t, orphanResp.Error) + + handler := icebergHandler.NewHandler(nil) + + // Step 1: Expire snapshots + expireConfig := icebergHandler.Config{ + SnapshotRetentionHours: 0, // instant expiry + MaxSnapshotsToKeep: 1, + MaxCommitRetries: 3, + } + result, err := handler.ExpireSnapshots(ctx, client, bucket, tablePath, expireConfig) + require.NoError(t, err) + assert.Contains(t, result, "expired") + t.Logf("Step 1 (expire): %s", result) + + // Step 2: Remove orphans + orphanConfig := icebergHandler.Config{ + OrphanOlderThanHours: 72, + MaxCommitRetries: 3, + } + result, err = handler.RemoveOrphans(ctx, client, bucket, tablePath, orphanConfig) + require.NoError(t, err) + t.Logf("Step 2 (orphans): %s", result) + // The orphan and the unreferenced files from expired snapshots should be gone + assert.Contains(t, result, "removed") + + // Step 3: Verify metadata is consistent + entry := lookupEntry(t, client, path.Join(s3tables.TablesPath, bucket, ns), tbl) + require.NotNil(t, entry) + xattr := entry.Extended[s3tables.ExtendedKeyMetadata] + require.NotEmpty(t, xattr) + + var internalMeta struct { + MetadataVersion int `json:"metadataVersion"` + } + require.NoError(t, json.Unmarshal(xattr, &internalMeta)) + assert.Greater(t, internalMeta.MetadataVersion, 1, "metadata version should have advanced through the cycle") + t.Logf("Final metadata version: %d", internalMeta.MetadataVersion) +} diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index c04a66c78..4351c2172 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -78,7 +78,7 @@ type Plugin struct { schedulerRun map[string]*schedulerRunInfo schedulerLoopMu sync.Mutex schedulerLoopState schedulerLoopState - schedulerWakeCh chan struct{} + schedulerWakeCh chan struct{} dedupeMu sync.Mutex recentDedupeByType map[string]map[string]time.Time @@ -392,7 +392,6 @@ func (r *Plugin) SaveJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig) err return nil } - func (r *Plugin) LoadDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) { return r.store.LoadDescriptor(jobType) } diff --git a/weed/admin/topology/types.go b/weed/admin/topology/types.go index d7c1d5352..f38c984c3 100644 --- a/weed/admin/topology/types.go +++ b/weed/admin/topology/types.go @@ -15,7 +15,7 @@ const ( TaskTypeBalance TaskType = "balance" TaskTypeErasureCoding TaskType = "erasure_coding" TaskTypeReplication TaskType = "replication" - TaskTypeECBalance TaskType = "ec_balance" + TaskTypeECBalance TaskType = "ec_balance" ) // Common task status constants diff --git a/weed/command/volume.go b/weed/command/volume.go index 34deaee42..3d9f4ce71 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -351,7 +351,7 @@ func parseVolumeTags(tagsArg string, folderCount int) [][]string { tagEntries = strings.Split(tagsArg, ",") } folderTags := make([][]string, folderCount) - + // If exactly one tag entry provided, replicate it to all folders if len(tagEntries) == 1 { normalized := util.NormalizeTagList(strings.Split(tagEntries[0], ":")) diff --git a/weed/filer/filer_lazy_remote_test.go b/weed/filer/filer_lazy_remote_test.go index 42dae97e6..6ad5e4449 100644 --- a/weed/filer/filer_lazy_remote_test.go +++ b/weed/filer/filer_lazy_remote_test.go @@ -29,29 +29,29 @@ import ( // --- minimal FilerStore stub --- type stubFilerStore struct { - mu sync.Mutex - entries map[string]*Entry - kv map[string][]byte - insertErr error + mu sync.Mutex + entries map[string]*Entry + kv map[string][]byte + insertErr error deleteErrByPath map[string]error } func newStubFilerStore() *stubFilerStore { return &stubFilerStore{ - entries: make(map[string]*Entry), - kv: make(map[string][]byte), + entries: make(map[string]*Entry), + kv: make(map[string][]byte), deleteErrByPath: make(map[string]error), } } -func (s *stubFilerStore) GetName() string { return "stub" } +func (s *stubFilerStore) GetName() string { return "stub" } func (s *stubFilerStore) Initialize(util.Configuration, string) error { return nil } -func (s *stubFilerStore) Shutdown() {} +func (s *stubFilerStore) Shutdown() {} func (s *stubFilerStore) BeginTransaction(ctx context.Context) (context.Context, error) { return ctx, nil } -func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil } -func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil } +func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil } +func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil } func (s *stubFilerStore) KvPut(_ context.Context, key []byte, value []byte) error { s.mu.Lock() defer s.mu.Unlock() @@ -245,9 +245,9 @@ func (c *stubRemoteClient) ListDirectory(_ context.Context, loc *remote_pb.Remot } return nil } -func (c *stubRemoteClient) ListBuckets() ([]*remote_storage.Bucket, error) { return nil, nil } -func (c *stubRemoteClient) CreateBucket(string) error { return nil } -func (c *stubRemoteClient) DeleteBucket(string) error { return nil } +func (c *stubRemoteClient) ListBuckets() ([]*remote_storage.Bucket, error) { return nil, nil } +func (c *stubRemoteClient) CreateBucket(string) error { return nil } +func (c *stubRemoteClient) DeleteBucket(string) error { return nil } // --- stub RemoteStorageClientMaker --- diff --git a/weed/filer/remote_storage_test.go b/weed/filer/remote_storage_test.go index d71527e7e..04f12c742 100644 --- a/weed/filer/remote_storage_test.go +++ b/weed/filer/remote_storage_test.go @@ -52,9 +52,9 @@ func TestFilerRemoteStorage_FindMountDirectory_LongestPrefixWins(t *testing.T) { }) tests := []struct { - path string - wantMount string - wantBucket string + path string + wantMount string + wantBucket string }{ {"/buckets/mybucket/file.txt", "/buckets/mybucket", "bucket-root"}, {"/buckets/mybucket/prefix/file.txt", "/buckets/mybucket/prefix", "bucket-prefix"}, diff --git a/weed/iam/integration/trust_policy_principal_test.go b/weed/iam/integration/trust_policy_principal_test.go index 6e4cb0906..8de9f78d7 100644 --- a/weed/iam/integration/trust_policy_principal_test.go +++ b/weed/iam/integration/trust_policy_principal_test.go @@ -22,13 +22,13 @@ func TestTrustPolicyAWSUserPrincipal(t *testing.T) { ctx := context.Background() const ( - accountID = "000000000000" - backendUser = "backend" - backendArn = "arn:aws:iam::" + accountID + ":user/" + backendUser - otherUser = "other" - otherArn = "arn:aws:iam::" + accountID + ":user/" + otherUser - clientRoleN = "ClientRole" - clientRoleA = "arn:aws:iam::role/" + clientRoleN + accountID = "000000000000" + backendUser = "backend" + backendArn = "arn:aws:iam::" + accountID + ":user/" + backendUser + otherUser = "other" + otherArn = "arn:aws:iam::" + accountID + ":user/" + otherUser + clientRoleN = "ClientRole" + clientRoleA = "arn:aws:iam::role/" + clientRoleN ) // Create role with trust policy restricted to a specific AWS user principal diff --git a/weed/iam/responses.go b/weed/iam/responses.go index fd91e27f4..6ce19cbd6 100644 --- a/weed/iam/responses.go +++ b/weed/iam/responses.go @@ -362,7 +362,7 @@ type ListAttachedGroupPoliciesResponse struct { // ListGroupsForUserResponse is the response for ListGroupsForUser action. type ListGroupsForUserResponse struct { - XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ ListGroupsForUserResponse"` + XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ ListGroupsForUserResponse"` ListGroupsForUserResult struct { Groups []*iam.Group `xml:"Groups>member"` IsTruncated bool `xml:"IsTruncated"` diff --git a/weed/iamapi/iamapi_group_handlers.go b/weed/iamapi/iamapi_group_handlers.go index b2a3545f6..620fc1f62 100644 --- a/weed/iamapi/iamapi_group_handlers.go +++ b/weed/iamapi/iamapi_group_handlers.go @@ -326,4 +326,3 @@ func buildUserGroupsIndex(s3cfg *iam_pb.S3ApiConfiguration) map[string][]string } return index } - diff --git a/weed/iamapi/iamapi_response.go b/weed/iamapi/iamapi_response.go index 56fcb1be1..021e4a0eb 100644 --- a/weed/iamapi/iamapi_response.go +++ b/weed/iamapi/iamapi_response.go @@ -9,19 +9,19 @@ import ( // Type aliases for IAM response types from shared package type ( - CommonResponse = iamlib.CommonResponse - ListUsersResponse = iamlib.ListUsersResponse - ListAccessKeysResponse = iamlib.ListAccessKeysResponse - DeleteAccessKeyResponse = iamlib.DeleteAccessKeyResponse - CreatePolicyResponse = iamlib.CreatePolicyResponse - CreateUserResponse = iamlib.CreateUserResponse - DeleteUserResponse = iamlib.DeleteUserResponse - GetUserResponse = iamlib.GetUserResponse - UpdateUserResponse = iamlib.UpdateUserResponse - CreateAccessKeyResponse = iamlib.CreateAccessKeyResponse - UpdateAccessKeyResponse = iamlib.UpdateAccessKeyResponse - PutUserPolicyResponse = iamlib.PutUserPolicyResponse - DeleteUserPolicyResponse = iamlib.DeleteUserPolicyResponse + CommonResponse = iamlib.CommonResponse + ListUsersResponse = iamlib.ListUsersResponse + ListAccessKeysResponse = iamlib.ListAccessKeysResponse + DeleteAccessKeyResponse = iamlib.DeleteAccessKeyResponse + CreatePolicyResponse = iamlib.CreatePolicyResponse + CreateUserResponse = iamlib.CreateUserResponse + DeleteUserResponse = iamlib.DeleteUserResponse + GetUserResponse = iamlib.GetUserResponse + UpdateUserResponse = iamlib.UpdateUserResponse + CreateAccessKeyResponse = iamlib.CreateAccessKeyResponse + UpdateAccessKeyResponse = iamlib.UpdateAccessKeyResponse + PutUserPolicyResponse = iamlib.PutUserPolicyResponse + DeleteUserPolicyResponse = iamlib.DeleteUserPolicyResponse GetUserPolicyResponse = iamlib.GetUserPolicyResponse GetPolicyResponse = iamlib.GetPolicyResponse DeletePolicyResponse = iamlib.DeletePolicyResponse @@ -30,12 +30,12 @@ type ( DetachUserPolicyResponse = iamlib.DetachUserPolicyResponse ListAttachedUserPoliciesResponse = iamlib.ListAttachedUserPoliciesResponse ErrorResponse = iamlib.ErrorResponse - ServiceAccountInfo = iamlib.ServiceAccountInfo - CreateServiceAccountResponse = iamlib.CreateServiceAccountResponse - DeleteServiceAccountResponse = iamlib.DeleteServiceAccountResponse - ListServiceAccountsResponse = iamlib.ListServiceAccountsResponse - GetServiceAccountResponse = iamlib.GetServiceAccountResponse - UpdateServiceAccountResponse = iamlib.UpdateServiceAccountResponse + ServiceAccountInfo = iamlib.ServiceAccountInfo + CreateServiceAccountResponse = iamlib.CreateServiceAccountResponse + DeleteServiceAccountResponse = iamlib.DeleteServiceAccountResponse + ListServiceAccountsResponse = iamlib.ListServiceAccountsResponse + GetServiceAccountResponse = iamlib.GetServiceAccountResponse + UpdateServiceAccountResponse = iamlib.UpdateServiceAccountResponse // Group response types CreateGroupResponse = iamlib.CreateGroupResponse DeleteGroupResponse = iamlib.DeleteGroupResponse diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index a03959cc0..4925a23b6 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -37,8 +37,8 @@ type MetaCache struct { applyDone chan struct{} applyStateMu sync.Mutex applyClosed bool - buildingDirs map[util.FullPath]*directoryBuildState - dedupRing dedupRingBuffer + buildingDirs map[util.FullPath]*directoryBuildState + dedupRing dedupRingBuffer } var errMetaCacheClosed = errors.New("metadata cache is shut down") @@ -98,8 +98,8 @@ func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPat invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) { invalidateFunc(fullpath, entry) }, - applyCh: make(chan metadataApplyRequest, 128), - applyDone: make(chan struct{}), + applyCh: make(chan metadataApplyRequest, 128), + applyDone: make(chan struct{}), buildingDirs: make(map[util.FullPath]*directoryBuildState), dedupRing: newDedupRingBuffer(), } diff --git a/weed/plugin/worker/ec_balance_handler.go b/weed/plugin/worker/ec_balance_handler.go index 7f5d1d7a0..cff5e670c 100644 --- a/weed/plugin/worker/ec_balance_handler.go +++ b/weed/plugin/worker/ec_balance_handler.go @@ -162,10 +162,10 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ - "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, - "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}}, + "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, + "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}}, "min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}}, - "preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, }, }, AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ @@ -180,10 +180,10 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { JobTypeMaxRuntimeSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ - "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, - "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}}, + "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, + "min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}}, "min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}}, - "preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, }, } } @@ -527,11 +527,11 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb }, }, Labels: map[string]string{ - "task_type": "ec_balance", - "volume_id": fmt.Sprintf("%d", result.VolumeID), - "collection": result.Collection, - "source_node": sourceNode, - "target_node": targetNode, + "task_type": "ec_balance", + "volume_id": fmt.Sprintf("%d", result.VolumeID), + "collection": result.Collection, + "source_node": sourceNode, + "target_node": targetNode, }, }, nil } diff --git a/weed/plugin/worker/ec_balance_handler_test.go b/weed/plugin/worker/ec_balance_handler_test.go index a5d23c5ea..d5583e50b 100644 --- a/weed/plugin/worker/ec_balance_handler_test.go +++ b/weed/plugin/worker/ec_balance_handler_test.go @@ -12,9 +12,9 @@ import ( func TestDeriveECBalanceWorkerConfig(t *testing.T) { tests := []struct { - name string - values map[string]*plugin_pb.ConfigValue - check func(t *testing.T, config *ecBalanceWorkerConfig) + name string + values map[string]*plugin_pb.ConfigValue + check func(t *testing.T, config *ecBalanceWorkerConfig) }{ { name: "nil values uses defaults", diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index a8a028133..54211b07f 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -11,8 +11,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/util" ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/wildcard" erasurecodingtask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" diff --git a/weed/plugin/worker/iceberg/compact.go b/weed/plugin/worker/iceberg/compact.go index b285404e3..62fa42ecb 100644 --- a/weed/plugin/worker/iceberg/compact.go +++ b/weed/plugin/worker/iceberg/compact.go @@ -364,14 +364,23 @@ func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minF // splitOversizedBin splits a bin whose total size exceeds targetSize into // sub-bins that each stay under targetSize while meeting minFiles. +// Entries are sorted by size descending before splitting so that large +// files are placed first, improving bin packing efficiency. func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []compactionBin { + // Sort largest-first for better packing. + sorted := make([]iceberg.ManifestEntry, len(bin.Entries)) + copy(sorted, bin.Entries) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].DataFile().FileSizeBytes() > sorted[j].DataFile().FileSizeBytes() + }) + var bins []compactionBin current := compactionBin{ PartitionKey: bin.PartitionKey, Partition: bin.Partition, } - for _, entry := range bin.Entries { - if current.TotalSize > 0 && current.TotalSize+entry.DataFile().FileSizeBytes() > targetSize && len(current.Entries) >= minFiles { + for _, entry := range sorted { + if current.TotalSize > 0 && current.TotalSize+entry.DataFile().FileSizeBytes() > targetSize { bins = append(bins, current) current = compactionBin{ PartitionKey: bin.PartitionKey, @@ -381,9 +390,44 @@ func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []comp current.Entries = append(current.Entries, entry) current.TotalSize += entry.DataFile().FileSizeBytes() } - if len(current.Entries) >= minFiles { + if len(current.Entries) > 0 { bins = append(bins, current) } + + // Merge any bin with fewer than minFiles entries into its neighbor. + // Repeat until no more merges are needed. This handles runts at the + // end (from the split) and small leading bins (from largest-first + // sorting placing single large files into their own bins). + for changed := true; changed && len(bins) > 1; { + changed = false + for i := 0; i < len(bins); i++ { + if len(bins[i].Entries) >= minFiles { + continue + } + // Pick the better neighbor to merge into. + var target int + if i == 0 { + target = 1 + } else if i == len(bins)-1 { + target = i - 1 + } else if bins[i-1].TotalSize <= bins[i+1].TotalSize { + target = i - 1 + } else { + target = i + 1 + } + bins[target].Entries = append(bins[target].Entries, bins[i].Entries...) + bins[target].TotalSize += bins[i].TotalSize + bins = append(bins[:i], bins[i+1:]...) + changed = true + break // restart scan after structural change + } + } + + // Final guard: if a single remaining bin has fewer than minFiles + // (entire input too small), return nothing. + if len(bins) == 1 && len(bins[0].Entries) < minFiles { + return nil + } return bins } @@ -413,10 +457,11 @@ func partitionKey(partition map[int]any) string { } // mergeParquetFiles reads multiple small Parquet files and merges them into -// a single Parquet file. It reads rows from each source and writes them to -// the output using the schema from the first file. -// -// Files are loaded into memory (appropriate for compacting small files). +// a single Parquet file. Files are processed one at a time: each source file +// is loaded, its rows are streamed into the output writer, and then its data +// is released before the next file is loaded. This keeps peak memory +// proportional to the size of a single input file plus the output buffer, +// rather than the sum of all inputs. func mergeParquetFiles( ctx context.Context, filerClient filer_pb.SeaweedFilerClient, @@ -427,72 +472,84 @@ func mergeParquetFiles( return nil, 0, fmt.Errorf("no entries to merge") } - // Read all source files and create parquet readers - type sourceFile struct { - reader *parquet.Reader - data []byte + // Load the first file to obtain the schema for the writer. + firstData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entries[0].DataFile().FilePath()) + if err != nil { + return nil, 0, fmt.Errorf("read parquet file %s: %w", entries[0].DataFile().FilePath(), err) } - var sources []sourceFile - defer func() { - for _, src := range sources { - if src.reader != nil { - src.reader.Close() + firstReader := parquet.NewReader(bytes.NewReader(firstData)) + parquetSchema := firstReader.Schema() + if parquetSchema == nil { + firstReader.Close() + return nil, 0, fmt.Errorf("no parquet schema found in %s", entries[0].DataFile().FilePath()) + } + + var outputBuf bytes.Buffer + writer := parquet.NewWriter(&outputBuf, parquetSchema) + + // drainReader streams all rows from reader into writer, then closes reader. + // source identifies the input file for error messages. + var totalRows int64 + rows := make([]parquet.Row, 256) + drainReader := func(reader *parquet.Reader, source string) error { + defer reader.Close() + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + n, readErr := reader.ReadRows(rows) + if n > 0 { + if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil { + return fmt.Errorf("write rows from %s: %w", source, writeErr) + } + totalRows += int64(n) + } + if readErr != nil { + if readErr == io.EOF { + return nil + } + return fmt.Errorf("read rows from %s: %w", source, readErr) } } - }() + } - var parquetSchema *parquet.Schema - for _, entry := range entries { + // Drain the first file. + firstSource := entries[0].DataFile().FilePath() + if err := drainReader(firstReader, firstSource); err != nil { + writer.Close() + return nil, 0, err + } + firstData = nil // allow GC + + // Process remaining files one at a time. + for _, entry := range entries[1:] { select { case <-ctx.Done(): + writer.Close() return nil, 0, ctx.Err() default: } data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath()) if err != nil { + writer.Close() return nil, 0, fmt.Errorf("read parquet file %s: %w", entry.DataFile().FilePath(), err) } reader := parquet.NewReader(bytes.NewReader(data)) - readerSchema := reader.Schema() - if parquetSchema == nil { - parquetSchema = readerSchema - } else if !schemasEqual(parquetSchema, readerSchema) { + if !schemasEqual(parquetSchema, reader.Schema()) { + reader.Close() + writer.Close() return nil, 0, fmt.Errorf("schema mismatch in %s: cannot merge files with different schemas", entry.DataFile().FilePath()) } - sources = append(sources, sourceFile{reader: reader, data: data}) - } - if parquetSchema == nil { - return nil, 0, fmt.Errorf("no parquet schema found") - } - - // Write merged output - var outputBuf bytes.Buffer - writer := parquet.NewWriter(&outputBuf, parquetSchema) - - var totalRows int64 - rows := make([]parquet.Row, 256) - - for _, src := range sources { - for { - n, err := src.reader.ReadRows(rows) - if n > 0 { - if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil { - writer.Close() - return nil, 0, fmt.Errorf("write rows: %w", writeErr) - } - totalRows += int64(n) - } - if err != nil { - if err == io.EOF { - break - } - writer.Close() - return nil, 0, fmt.Errorf("read rows: %w", err) - } + if err := drainReader(reader, entry.DataFile().FilePath()); err != nil { + writer.Close() + return nil, 0, err } + // data goes out of scope here, eligible for GC before next iteration. } if err := writer.Close(); err != nil { diff --git a/weed/plugin/worker/iceberg/config.go b/weed/plugin/worker/iceberg/config.go index 329ddca4b..f5a6557a1 100644 --- a/weed/plugin/worker/iceberg/config.go +++ b/weed/plugin/worker/iceberg/config.go @@ -18,6 +18,7 @@ const ( defaultMaxCommitRetries = 5 defaultTargetFileSizeBytes = 256 * 1024 * 1024 defaultMinInputFiles = 5 + defaultMinManifestsToRewrite = 5 defaultOperations = "all" ) @@ -29,6 +30,7 @@ type Config struct { MaxCommitRetries int64 TargetFileSizeBytes int64 MinInputFiles int64 + MinManifestsToRewrite int64 Operations string } @@ -42,6 +44,7 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries), TargetFileSizeBytes: readInt64Config(values, "target_file_size_bytes", defaultTargetFileSizeBytes), MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles), + MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite), Operations: readStringConfig(values, "operations", defaultOperations), } @@ -64,6 +67,9 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { if cfg.MinInputFiles < 2 { cfg.MinInputFiles = defaultMinInputFiles } + if cfg.MinManifestsToRewrite < 2 { + cfg.MinManifestsToRewrite = 2 + } return cfg } diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index 32c45a45d..32789df88 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -649,8 +649,9 @@ func TestRewriteManifestsBelowThreshold(t *testing.T) { handler := NewHandler(nil) config := Config{ - MinInputFiles: 10, // threshold higher than actual count (1) - MaxCommitRetries: 3, + MinInputFiles: 10, + MinManifestsToRewrite: 10, // threshold higher than actual manifest count (1) + MaxCommitRetries: 3, } result, err := handler.rewriteManifests(context.Background(), client, setup.BucketName, setup.tablePath(), config) @@ -775,8 +776,8 @@ func TestDetectWithFakeFiler(t *testing.T) { handler := NewHandler(nil) config := Config{ - SnapshotRetentionHours: 0, // everything is expired - MaxSnapshotsToKeep: 2, // 3 > 2, needs maintenance + SnapshotRetentionHours: 0, // everything is expired + MaxSnapshotsToKeep: 2, // 3 > 2, needs maintenance MaxCommitRetries: 3, } @@ -785,7 +786,7 @@ func TestDetectWithFakeFiler(t *testing.T) { client, config, "", "", "", // no filters - 0, // no limit + 0, // no limit ) if err != nil { t.Fatalf("scanTablesForMaintenance failed: %v", err) diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index fcdbc1511..e128a7575 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -165,6 +165,21 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, }, + { + SectionId: "manifests", + Title: "Manifest Rewriting", + Description: "Controls for merging small manifests.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "min_manifests_to_rewrite", + Label: "Min Manifests", + Description: "Minimum number of manifests before rewriting is triggered.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}}, + }, + }, + }, { SectionId: "general", Title: "General", @@ -190,8 +205,9 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ - "target_file_size_bytes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeBytes}}, - "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, + "target_file_size_bytes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeBytes}}, + "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, + "min_manifests_to_rewrite": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinManifestsToRewrite}}, "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, @@ -211,8 +227,8 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { JobTypeMaxRuntimeSeconds: 3600, // 1 hour max }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ - "target_file_size_bytes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeBytes}}, - "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, + "target_file_size_bytes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeBytes}}, + "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, "orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}}, @@ -255,11 +271,10 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq namespaceFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "namespace_filter", "")) tableFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "table_filter", "")) - // Connect to filer to scan table buckets - filerAddress := filerAddresses[0] - conn, err := grpc.NewClient(filerAddress, h.grpcDialOption) + // Connect to filer — try each address until one succeeds. + filerAddress, conn, err := h.connectToFiler(filerAddresses) if err != nil { - return fmt.Errorf("connect to filer %s: %w", filerAddress, err) + return fmt.Errorf("connect to filer: %w", err) } defer conn.Close() filerClient := filer_pb.NewSeaweedFilerClient(conn) @@ -329,6 +344,12 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ if bucketName == "" || namespace == "" || tableName == "" || filerAddress == "" { return fmt.Errorf("missing required parameters: bucket_name=%q, namespace=%q, table_name=%q, filer_address=%q", bucketName, namespace, tableName, filerAddress) } + // Reject path traversal in bucket/namespace/table names. + for _, name := range []string{bucketName, namespace, tableName} { + if strings.Contains(name, "..") || strings.ContainsAny(name, "/\\") { + return fmt.Errorf("invalid name %q: must not contain path separators or '..'", name) + } + } if tablePath == "" { tablePath = path.Join(namespace, tableName) } @@ -467,5 +488,24 @@ func (h *Handler) sendEmptyDetection(sender pluginworker.DetectionSender) error }) } +// connectToFiler tries each filer address in order and returns the first +// successful gRPC connection. If all addresses fail, it returns a +// consolidated error. +func (h *Handler) connectToFiler(addresses []string) (string, *grpc.ClientConn, error) { + var lastErr error + for _, addr := range addresses { + conn, err := grpc.NewClient(addr, h.grpcDialOption) + if err != nil { + lastErr = fmt.Errorf("filer %s: %w", addr, err) + continue + } + return addr, conn, nil + } + if lastErr == nil { + lastErr = fmt.Errorf("no filer addresses provided") + } + return "", nil, lastErr +} + // Ensure Handler implements JobHandler. var _ pluginworker.JobHandler = (*Handler)(nil) diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go index 8727d26d5..7db6bb0dd 100644 --- a/weed/plugin/worker/iceberg/handler_test.go +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -363,11 +363,11 @@ func TestManifestRewriteNestedPathConsistency(t *testing.T) { func TestNormalizeIcebergPath(t *testing.T) { tests := []struct { - name string + name string icebergPath string - bucket string - tablePath string - expected string + bucket string + tablePath string + expected string }{ { "relative metadata path", diff --git a/weed/plugin/worker/iceberg/operations.go b/weed/plugin/worker/iceberg/operations.go index 607283c21..c091b0d4b 100644 --- a/weed/plugin/worker/iceberg/operations.go +++ b/weed/plugin/worker/iceberg/operations.go @@ -322,8 +322,8 @@ func (h *Handler) rewriteManifests( return "", fmt.Errorf("parse manifest list: %w", err) } - if int64(len(manifests)) < config.MinInputFiles { - return fmt.Sprintf("only %d manifests, below threshold of %d", len(manifests), config.MinInputFiles), nil + if int64(len(manifests)) < config.MinManifestsToRewrite { + return fmt.Sprintf("only %d manifests, below threshold of %d", len(manifests), config.MinManifestsToRewrite), nil } // Collect all entries from data manifests, grouped by partition spec ID diff --git a/weed/plugin/worker/iceberg/testing_api.go b/weed/plugin/worker/iceberg/testing_api.go new file mode 100644 index 000000000..c0fdda5ff --- /dev/null +++ b/weed/plugin/worker/iceberg/testing_api.go @@ -0,0 +1,26 @@ +package iceberg + +import ( + "context" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// The following methods export the internal maintenance operations for use +// by integration tests. They are intentionally thin wrappers. + +func (h *Handler) ExpireSnapshots(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { + return h.expireSnapshots(ctx, client, bucketName, tablePath, config) +} + +func (h *Handler) RemoveOrphans(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { + return h.removeOrphans(ctx, client, bucketName, tablePath, config) +} + +func (h *Handler) RewriteManifests(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { + return h.rewriteManifests(ctx, client, bucketName, tablePath, config) +} + +func (h *Handler) CompactDataFiles(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, tablePath string, config Config) (string, error) { + return h.compactDataFiles(ctx, client, bucketName, tablePath, config) +} diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index a84420824..3b0286f83 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -14,8 +14,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" "github.com/seaweedfs/seaweedfs/weed/util/wildcard" + balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -1391,7 +1391,7 @@ func buildBatchVolumeBalanceProposals( }, Labels: map[string]string{ "task_type": "balance", - "batch": "true", + "batch": "true", "batch_size": fmt.Sprintf("%d", len(moves)), }, }) diff --git a/weed/s3api/s3_constants/s3_action_strings.go b/weed/s3api/s3_constants/s3_action_strings.go index 46b3eb8c7..20f41e8c2 100644 --- a/weed/s3api/s3_constants/s3_action_strings.go +++ b/weed/s3api/s3_constants/s3_action_strings.go @@ -8,7 +8,7 @@ const ( S3_ACTION_PUT_OBJECT = "s3:PutObject" S3_ACTION_DELETE_OBJECT = "s3:DeleteObject" S3_ACTION_DELETE_OBJECT_VERSION = "s3:DeleteObjectVersion" - S3_ACTION_GET_OBJECT_VERSION = "s3:GetObjectVersion" + S3_ACTION_GET_OBJECT_VERSION = "s3:GetObjectVersion" S3_ACTION_GET_OBJECT_ATTRIBUTES = "s3:GetObjectAttributes" // Object ACL operations diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index 5780f2899..31ee47cdb 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -22,7 +22,7 @@ import ( const proxyReadConcurrencyPerVolumeServer = 16 var ( - proxySemaphores sync.Map // host -> chan struct{} + proxySemaphores sync.Map // host -> chan struct{} ) func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) { diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 2fe720327..11239420e 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -44,8 +44,8 @@ type Topology struct { pulse int64 - volumeSizeLimit uint64 - replicationAsMin bool + volumeSizeLimit uint64 + replicationAsMin bool vacuumDisabledByOperator atomic.Bool // true when operator manually disables vacuum vacuumDisabledByPlugin atomic.Bool // true when disabled by the vacuum plugin monitor adminServerConnectedFunc func() bool // optional callback to check admin server presence diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index d55fbd5d4..d62e6b9a5 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -551,11 +551,11 @@ func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) { for i := 0; i < 15; i++ { volID := uint32(1 + i) err := at.AddPendingTask(topology.TaskSpec{ - TaskID: fmt.Sprintf("existing-%d", volID), - TaskType: topology.TaskTypeBalance, - VolumeID: volID, - VolumeSize: 1024, - Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}}, + TaskID: fmt.Sprintf("existing-%d", volID), + TaskType: topology.TaskTypeBalance, + VolumeID: volID, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}}, Destinations: []topology.TaskDestinationSpec{{ServerID: "node-b", DiskID: 2}}, }) if err != nil { @@ -717,9 +717,9 @@ func TestDetection_FourServers_DestinationSpreading(t *testing.T) { // the effective volume distribution is within the configured threshold. func TestDetection_ConvergenceVerification(t *testing.T) { tests := []struct { - name string - counts []int // volumes per server - threshold float64 + name string + counts []int // volumes per server + threshold float64 }{ {"2-server-big-gap", []int{100, 10}, 0.2}, {"3-server-staircase", []int{90, 50, 10}, 0.2}, diff --git a/weed/worker/tasks/balance/replica_placement.go b/weed/worker/tasks/balance/replica_placement.go index b46745df2..c90ad6fb0 100644 --- a/weed/worker/tasks/balance/replica_placement.go +++ b/weed/worker/tasks/balance/replica_placement.go @@ -143,4 +143,3 @@ func findTopRackKeys(m map[rackKey]int) (topKeys []rackKey, max int) { } return } - diff --git a/weed/worker/tasks/ec_balance/detection_test.go b/weed/worker/tasks/ec_balance/detection_test.go index 81e6ffb41..214137ab3 100644 --- a/weed/worker/tasks/ec_balance/detection_test.go +++ b/weed/worker/tasks/ec_balance/detection_test.go @@ -213,7 +213,7 @@ func TestDetectGlobalImbalance(t *testing.T) { "node1": { nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5, ecShards: map[uint32]*ecVolumeInfo{ - 100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards + 100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards 200: {collection: "col1", shardBits: 0b111111}, // 6 shards }, }, diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go index 8b1b6494a..bebd4738b 100644 --- a/weed/worker/types/task_types.go +++ b/weed/worker/types/task_types.go @@ -15,7 +15,7 @@ const ( TaskTypeErasureCoding TaskType = "erasure_coding" TaskTypeBalance TaskType = "balance" TaskTypeReplication TaskType = "replication" - TaskTypeECBalance TaskType = "ec_balance" + TaskTypeECBalance TaskType = "ec_balance" ) // TaskStatus represents the status of a maintenance task