diff --git a/test/s3tables/maintenance/maintenance_integration_test.go b/test/s3tables/maintenance/maintenance_integration_test.go index 735cca573..4a3857fff 100644 --- a/test/s3tables/maintenance/maintenance_integration_test.go +++ b/test/s3tables/maintenance/maintenance_integration_test.go @@ -22,6 +22,7 @@ import ( "path" "path/filepath" "strconv" + "strings" "sync" "testing" "time" @@ -30,12 +31,17 @@ import ( "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/parquet-go/parquet-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/seaweedfs/seaweedfs/weed/command" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" icebergHandler "github.com/seaweedfs/seaweedfs/weed/plugin/worker/iceberg" @@ -244,9 +250,36 @@ func newTestSchema() *iceberg.Schema { return iceberg.NewSchema(0, iceberg.NestedField{ID: 1, Type: iceberg.PrimitiveTypes.Int64, Name: "id", Required: true}, iceberg.NestedField{ID: 2, Type: iceberg.PrimitiveTypes.String, Name: "name", Required: false}, + iceberg.NestedField{ID: 3, Type: iceberg.PrimitiveTypes.String, Name: "payload", Required: false}, ) } +type compactRow struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + Payload string `parquet:"payload"` +} + +func buildParquetData(t *testing.T, prefix string, rowCount int) []byte { + t.Helper() + + rows := make([]compactRow, rowCount) + for i := range rows { + rows[i] = compactRow{ + ID: int64(i + 1), + Name: fmt.Sprintf("%s-%06d", prefix, i), + Payload: fmt.Sprintf("%s-%06d-%s", prefix, i, strings.Repeat(fmt.Sprintf("%04d", i%10000), 128)), + } + } + + var buf bytes.Buffer + writer := parquet.NewGenericWriter[compactRow](&buf) + _, err := writer.Write(rows) + require.NoError(t, err) + require.NoError(t, writer.Close()) + return buf.Bytes() +} + func buildMetadata(t *testing.T, snapshots []table.Snapshot) table.Metadata { t.Helper() schema := newTestSchema() @@ -308,6 +341,75 @@ func s3put(t *testing.T, s3Endpoint, bucket, key string, body []byte) { } } +func newS3Client(t *testing.T, endpoint string) *s3.Client { + t.Helper() + + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("admin", "admin", "")), + ) + require.NoError(t, err) + + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.UsePathStyle = true + }) +} + +func s3putObject(t *testing.T, client *s3.Client, bucket, key string, body []byte) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(body), + }) + require.NoError(t, err) +} + +type testFilerClient struct { + client filer_pb.SeaweedFilerClient +} + +func (c testFilerClient) WithFilerClient(_ bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return fn(c.client) +} + +func (c testFilerClient) AdjustedUrl(location *filer_pb.Location) string { + if location == nil { + return "" + } + if location.PublicUrl != "" { + return location.PublicUrl + } + return location.Url +} + +func (c testFilerClient) GetDataCenter() string { + return "" +} + +func readFile(t *testing.T, client filer_pb.SeaweedFilerClient, dir, name string) []byte { + t.Helper() + + entry := lookupEntry(t, client, dir, name) + require.NotNil(t, entry, "readFile(%s, %s): entry missing", dir, name) + if len(entry.Content) > 0 || len(entry.Chunks) == 0 { + return entry.Content + } + + reader := filer.NewFileReader(testFilerClient{client: client}, entry) + if closer, ok := reader.(io.Closer); ok { + defer closer.Close() + } + content, err := io.ReadAll(reader) + require.NoError(t, err, "readFile(%s, %s): read chunked content", dir, name) + return content +} + // createBucketViaS3 creates a regular S3 bucket via the S3 PUT Bucket API. // populateTableViaFiler creates the Iceberg directory structure, metadata xattr, // manifests, and data files for a table directly in the filer. @@ -447,6 +549,30 @@ func writeFile(t *testing.T, ctx context.Context, client filer_pb.SeaweedFilerCl require.Empty(t, resp.Error, "writeFile(%s, %s): resp error", dir, name) } +func upsertFile(t *testing.T, ctx context.Context, client filer_pb.SeaweedFilerClient, dir, name string, content []byte) { + t.Helper() + + entry := lookupEntry(t, client, dir, name) + if entry == nil { + writeFile(t, ctx, client, dir, name, content) + return + } + if entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + entry.Content = content + entry.Chunks = nil + entry.Attributes.Mtime = time.Now().Unix() + entry.Attributes.FileMode = uint32(0644) + entry.Attributes.FileSize = uint64(len(content)) + resp, err := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + require.NoError(t, err, "upsertFile(%s, %s): rpc error", dir, name) + require.NotNil(t, resp, "upsertFile(%s, %s): nil response", dir, name) +} + func lookupEntry(t *testing.T, client filer_pb.SeaweedFilerClient, dir, name string) *filer_pb.Entry { t.Helper() resp, err := filer_pb.LookupEntry(context.Background(), client, &filer_pb.LookupDirectoryEntryRequest{ @@ -471,6 +597,7 @@ func TestIcebergMaintenanceIntegration(t *testing.T) { } t.Run("ExpireSnapshots", testExpireSnapshots) + t.Run("CompactDataFiles", testCompactDataFiles) t.Run("RemoveOrphans", testRemoveOrphans) t.Run("RewriteManifests", testRewriteManifests) t.Run("FullMaintenanceCycle", testFullMaintenanceCycle) @@ -517,6 +644,180 @@ func testExpireSnapshots(t *testing.T) { assert.Greater(t, internalMeta.MetadataVersion, 1, "metadata version should have been incremented") } +func testCompactDataFiles(t *testing.T) { + _, client := shared.filerConn(t) + suffix := randomSuffix() + bucket := "maint-compact-" + suffix + ns := "ns" + tbl := "tbl" + + now := time.Now().UnixMilli() + snapshots := []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + } + meta := populateTableViaFiler(t, client, shared.s3Endpoint, bucket, ns, tbl, snapshots) + + ctx := context.Background() + tablePath := path.Join(ns, tbl) + dataDir := path.Join(s3tables.TablesPath, bucket, tablePath, "data") + metaDir := path.Join(s3tables.TablesPath, bucket, tablePath, "metadata") + + type parquetFile struct { + name string + recordCount int64 + fileSize int64 + } + + s3Client := newS3Client(t, shared.s3Endpoint) + var files []parquetFile + for i := 1; i <= 3; i++ { + name := fmt.Sprintf("compact-input-%d.parquet", i) + content := buildParquetData(t, fmt.Sprintf("compact-%d", i), 2000) + s3putObject(t, s3Client, bucket, path.Join(tablePath, "data", name), content) + + entry := lookupEntry(t, client, dataDir, name) + require.NotNil(t, entry, "uploaded parquet file should exist") + require.NotEmpty(t, entry.Chunks, "uploaded parquet file should be chunk-backed") + + files = append(files, parquetFile{ + name: name, + recordCount: 2000, + fileSize: int64(len(content)), + }) + } + + schema := meta.CurrentSchema() + spec := meta.PartitionSpec() + version := meta.Version() + snapID := snapshots[0].SnapshotID + + entries := make([]iceberg.ManifestEntry, 0, len(files)) + for _, file := range files { + dfBuilder, err := iceberg.NewDataFileBuilder( + spec, + iceberg.EntryContentData, + path.Join("data", file.name), + iceberg.ParquetFile, + map[int]any{}, + nil, nil, + file.recordCount, + file.fileSize, + ) + require.NoError(t, err) + entries = append(entries, iceberg.NewManifestEntry( + iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build(), + )) + } + + manifestName := "manifest-compact.avro" + var manifestBuf bytes.Buffer + manifest, err := iceberg.WriteManifest( + path.Join("metadata", manifestName), &manifestBuf, + version, spec, schema, snapID, entries, + ) + require.NoError(t, err) + writeFile(t, ctx, client, metaDir, manifestName, manifestBuf.Bytes()) + + var manifestListBuf bytes.Buffer + seqNum := snapshots[0].SequenceNumber + require.NoError(t, iceberg.WriteManifestList( + version, &manifestListBuf, snapID, snapshots[0].ParentSnapshotID, &seqNum, 0, + []iceberg.ManifestFile{manifest}, + )) + upsertFile(t, ctx, client, metaDir, path.Base(snapshots[0].ManifestList), manifestListBuf.Bytes()) + + handler := icebergHandler.NewHandler(nil) + config := icebergHandler.Config{ + TargetFileSizeBytes: 16 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + } + + result, err := handler.CompactDataFiles(ctx, client, bucket, tablePath, config) + require.NoError(t, err) + assert.Contains(t, result, "compacted") + t.Logf("CompactDataFiles result: %s", result) + + var compacted *filer_pb.Entry + listErr := filer_pb.SeaweedList(ctx, client, dataDir, "", func(entry *filer_pb.Entry, isLast bool) error { + // Compacted output uses compact-{snapID}-{newSnapID}-{binIdx}.parquet. + // The test inputs use compact-input-{n}.parquet and must not satisfy this check. + if strings.HasPrefix(entry.Name, "compact-") && + !strings.HasPrefix(entry.Name, "compact-input-") && + strings.HasSuffix(entry.Name, ".parquet") { + compacted = entry + } + return nil + }, "", false, 1000) + require.NoError(t, listErr) + require.NotNil(t, compacted, "compaction should create a merged parquet file") + + tableEntry := lookupEntry(t, client, path.Join(s3tables.TablesPath, bucket, ns), tbl) + require.NotNil(t, tableEntry) + xattr := tableEntry.Extended[s3tables.ExtendedKeyMetadata] + require.NotEmpty(t, xattr) + + var internalMeta struct { + MetadataVersion int `json:"metadataVersion"` + MetadataLocation string `json:"metadataLocation,omitempty"` + } + require.NoError(t, json.Unmarshal(xattr, &internalMeta)) + assert.Greater(t, internalMeta.MetadataVersion, 1, "metadata version should advance after compaction") + + metadataName := path.Base(internalMeta.MetadataLocation) + require.NotEmpty(t, metadataName, "compaction should point table metadata at a committed metadata file") + metadataBytes := readFile(t, client, metaDir, metadataName) + committedMeta, err := table.ParseMetadataBytes(metadataBytes) + require.NoError(t, err) + + currentSnap := committedMeta.CurrentSnapshot() + require.NotNil(t, currentSnap, "committed metadata should include a current snapshot") + assert.Equal(t, snapshots[0].SequenceNumber+1, currentSnap.SequenceNumber, "compaction should advance the snapshot sequence number") + + manifestListName := path.Base(currentSnap.ManifestList) + require.NotEmpty(t, manifestListName, "committed snapshot should reference a manifest list") + manifestListBytes := readFile(t, client, metaDir, manifestListName) + manifestFiles, err := iceberg.ReadManifestList(bytes.NewReader(manifestListBytes)) + require.NoError(t, err) + require.Len(t, manifestFiles, 1, "compaction should commit a single replacement manifest") + assert.Equal(t, snapshots[0].SequenceNumber+1, manifestFiles[0].SequenceNum(), "new manifest should use the committed snapshot sequence number") + assert.Equal(t, snapshots[0].SequenceNumber+1, manifestFiles[0].MinSequenceNum(), "compaction manifest should inherit the committed sequence number for added data") + + manifestName = path.Base(manifestFiles[0].FilePath()) + manifestBytes := readFile(t, client, metaDir, manifestName) + manifestEntries, err := iceberg.ReadManifest(manifestFiles[0], bytes.NewReader(manifestBytes), false) + require.NoError(t, err) + require.Len(t, manifestEntries, len(files)+1, "compaction should replace the input files with one merged output") + + deletedPaths := make(map[string]struct{}, len(files)) + addedPaths := make(map[string]struct{}) + for _, entry := range manifestEntries { + switch entry.Status() { + case iceberg.EntryStatusADDED: + addedPaths[entry.DataFile().FilePath()] = struct{}{} + assert.Equal(t, snapshots[0].SequenceNumber+1, entry.SequenceNum(), "added entries should inherit the new snapshot sequence number") + fileSeqNum := entry.FileSequenceNum() + require.NotNil(t, fileSeqNum, "added entries should carry a file sequence number") + assert.Equal(t, snapshots[0].SequenceNumber+1, *fileSeqNum) + case iceberg.EntryStatusDELETED: + deletedPaths[entry.DataFile().FilePath()] = struct{}{} + assert.Equal(t, snapshots[0].SequenceNumber, entry.SequenceNum(), "deleted entries should preserve the original data sequence number") + fileSeqNum := entry.FileSequenceNum() + require.NotNil(t, fileSeqNum, "deleted entries should preserve file sequence numbers") + assert.Equal(t, snapshots[0].SequenceNumber, *fileSeqNum) + default: + t.Fatalf("unexpected manifest entry status %v for %s", entry.Status(), entry.DataFile().FilePath()) + } + } + + require.Len(t, addedPaths, 1, "compaction should add exactly one merged parquet file") + assert.Contains(t, addedPaths, path.Join("data", compacted.Name)) + require.Len(t, deletedPaths, len(files), "compaction should delete every original small input file") + for _, file := range files { + assert.Contains(t, deletedPaths, path.Join("data", file.name)) + } +} + func testRemoveOrphans(t *testing.T) { _, client := shared.filerConn(t) suffix := randomSuffix() diff --git a/weed/plugin/worker/iceberg/compact.go b/weed/plugin/worker/iceberg/compact.go index 40d9092eb..d41d8c986 100644 --- a/weed/plugin/worker/iceberg/compact.go +++ b/weed/plugin/worker/iceberg/compact.go @@ -109,6 +109,22 @@ func (h *Handler) compactDataFiles( var deletedManifestEntries []iceberg.ManifestEntry totalMerged := 0 + entrySeqNum := func(entry iceberg.ManifestEntry) *int64 { + seqNum := entry.SequenceNum() + if seqNum < 0 { + return nil + } + return &seqNum + } + + entryFileSeqNum := func(entry iceberg.ManifestEntry) *int64 { + if fileSeqNum := entry.FileSequenceNum(); fileSeqNum != nil { + value := *fileSeqNum + return &value + } + return entrySeqNum(entry) + } + metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata") dataDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "data") @@ -189,7 +205,7 @@ func (h *Handler) compactDataFiles( delEntry := iceberg.NewManifestEntry( iceberg.EntryStatusDELETED, &newSnapID, - nil, nil, + entrySeqNum(entry), entryFileSeqNum(entry), entry.DataFile(), ) deletedManifestEntries = append(deletedManifestEntries, delEntry) @@ -221,7 +237,7 @@ func (h *Handler) compactDataFiles( existingEntry := iceberg.NewManifestEntry( iceberg.EntryStatusEXISTING, func() *int64 { id := entry.SnapshotID(); return &id }(), - nil, nil, + entrySeqNum(entry), entryFileSeqNum(entry), entry.DataFile(), ) manifestEntries = append(manifestEntries, existingEntry) diff --git a/weed/plugin/worker/iceberg/filer_io.go b/weed/plugin/worker/iceberg/filer_io.go index eddc42b0b..db72b42a6 100644 --- a/weed/plugin/worker/iceberg/filer_io.go +++ b/weed/plugin/worker/iceberg/filer_io.go @@ -9,12 +9,15 @@ import ( "io" "path" "strings" + "sync" "time" "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -25,6 +28,30 @@ type filerFileEntry struct { Entry *filer_pb.Entry } +var initGlobalHTTPClientOnce sync.Once + +type singleFilerClient struct { + client filer_pb.SeaweedFilerClient +} + +func (c singleFilerClient) WithFilerClient(_ bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return fn(c.client) +} + +func (c singleFilerClient) AdjustedUrl(location *filer_pb.Location) string { + if location == nil { + return "" + } + if location.PublicUrl != "" { + return location.PublicUrl + } + return location.Url +} + +func (c singleFilerClient) GetDataCenter() string { + return "" +} + // listFilerEntries lists all entries in a directory. func listFilerEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir, prefix string) ([]*filer_pb.Entry, error) { var entries []*filer_pb.Entry @@ -174,15 +201,20 @@ func loadFileByIcebergPath(ctx context.Context, client filer_pb.SeaweedFilerClie return nil, fmt.Errorf("file not found: %s/%s", dir, fileName) } - // Inline content is available for small files (metadata, manifests, and - // manifest lists written by saveFilerFile). Larger files uploaded via S3 - // are stored as chunks with empty Content — detect this and return a - // clear error rather than silently returning empty data. - if len(resp.Entry.Content) == 0 && len(resp.Entry.Chunks) > 0 { - return nil, fmt.Errorf("file %s/%s is stored in chunks; only inline content is supported", dir, fileName) + if len(resp.Entry.Content) > 0 || len(resp.Entry.Chunks) == 0 { + return resp.Entry.Content, nil } - return resp.Entry.Content, nil + initGlobalHTTPClientOnce.Do(util_http.InitGlobalHttpClient) + reader := filer.NewFileReader(singleFilerClient{client: client}, resp.Entry) + if closer, ok := reader.(io.Closer); ok { + defer closer.Close() + } + data, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("read chunked file %s/%s: %w", dir, fileName, err) + } + return data, nil } // normalizeIcebergPath converts an Iceberg path (which may be an S3 URL, an