Files
seaweedFS/weed/s3api/s3tables/iceberg_layout.go
Chris Lu 995dfc4d5d chore: remove ~50k lines of unreachable dead code (#8913)
* chore: remove unreachable dead code across the codebase

Remove ~50,000 lines of unreachable code identified by static analysis.

Major removals:
- weed/filer/redis_lua: entire unused Redis Lua filer store implementation
- weed/wdclient/net2, resource_pool: unused connection/resource pool packages
- weed/plugin/worker/lifecycle: unused lifecycle plugin worker
- weed/s3api: unused S3 policy templates, presigned URL IAM, streaming copy,
  multipart IAM, key rotation, and various SSE helper functions
- weed/mq/kafka: unused partition mapping, compression, schema, and protocol functions
- weed/mq/offset: unused SQL storage and migration code
- weed/worker: unused registry, task, and monitoring functions
- weed/query: unused SQL engine, parquet scanner, and type functions
- weed/shell: unused EC proportional rebalance functions
- weed/storage/erasure_coding/distribution: unused distribution analysis functions
- Individual unreachable functions removed from 150+ files across admin,
  credential, filer, iam, kms, mount, mq, operation, pb, s3api, server,
  shell, storage, topology, and util packages

* fix(s3): reset shared memory store in IAM test to prevent flaky failure

TestLoadIAMManagerFromConfig_EmptyConfigWithFallbackKey was flaky because
the MemoryStore credential backend is a singleton registered via init().
Earlier tests that create anonymous identities pollute the shared store,
causing LookupAnonymous() to unexpectedly return true.

Fix by calling Reset() on the memory store before the test runs.

* style: run gofmt on changed files

* fix: restore KMS functions used by integration tests

* fix(plugin): prevent panic on send to closed worker session channel

The Plugin.sendToWorker method could panic with "send on closed channel"
when a worker disconnected while a message was being sent. The race was
between streamSession.close() closing the outgoing channel and sendToWorker
writing to it concurrently.

Add a done channel to streamSession that is closed before the outgoing
channel, and check it in sendToWorker's select to safely detect closed
sessions without panicking.
2026-04-03 16:04:27 -07:00

305 lines
10 KiB
Go

package s3tables
import (
pathpkg "path"
"regexp"
"strings"
)
// Iceberg file layout validation
// Apache Iceberg tables follow a specific file layout structure:
// - metadata/ directory containing metadata files (*.json, *.avro)
// - data/ directory containing data files (*.parquet, *.orc, *.avro)
//
// Valid file patterns include:
// - metadata/v*.metadata.json (table metadata)
// - metadata/snap-*.avro (snapshot manifest lists)
// - metadata/*.avro (manifest files)
// - data/*.parquet, data/*.orc, data/*.avro (data files)
const uuidPattern = `[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}`
var (
// Allowed directories in an Iceberg table
icebergAllowedDirs = map[string]bool{
"metadata": true,
"data": true,
}
// Patterns for valid metadata files
metadataFilePatterns = []*regexp.Regexp{
regexp.MustCompile(`^v\d+\.metadata\.json$`), // Table metadata: v1.metadata.json, v2.metadata.json
regexp.MustCompile(`^snap-\d+-\d+-` + uuidPattern + `\.avro$`), // Snapshot manifests: snap-123-1-uuid.avro
regexp.MustCompile(`^` + uuidPattern + `-m\d+\.avro$`), // Manifest files: uuid-m0.avro
regexp.MustCompile(`^` + uuidPattern + `\.avro$`), // General manifest files
regexp.MustCompile(`^version-hint\.text$`), // Version hint file
regexp.MustCompile(`^` + uuidPattern + `\.metadata\.json$`), // UUID-named metadata
regexp.MustCompile(`^[^/]+\.stats$`), // Trino/Iceberg stats files
}
// Patterns for valid data files
dataFilePatterns = []*regexp.Regexp{
regexp.MustCompile(`^[^/]+\.parquet$`), // Parquet files
regexp.MustCompile(`^[^/]+\.orc$`), // ORC files
regexp.MustCompile(`^[^/]+\.avro$`), // Avro files
}
// Data file partition path pattern (e.g., year=2024/month=01/)
partitionPathPattern = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*=[^/]+$`)
// Pattern for valid subdirectory names (alphanumeric, underscore, hyphen, and UUID-style directories)
validSubdirectoryPattern = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`)
)
// IcebergLayoutValidator validates that files conform to Iceberg table layout
type IcebergLayoutValidator struct{}
// NewIcebergLayoutValidator creates a new Iceberg layout validator
func NewIcebergLayoutValidator() *IcebergLayoutValidator {
return &IcebergLayoutValidator{}
}
// ValidateFilePath validates that a file path conforms to Iceberg layout
// The path should be relative to the table root (e.g., "metadata/v1.metadata.json" or "data/file.parquet")
func (v *IcebergLayoutValidator) ValidateFilePath(relativePath string) error {
// Normalize path separators
relativePath = strings.TrimPrefix(relativePath, "/")
if relativePath == "" {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "empty file path",
}
}
parts := strings.SplitN(relativePath, "/", 2)
topDir := parts[0]
// Check if top-level directory is allowed
if !icebergAllowedDirs[topDir] {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "files must be placed in 'metadata/' or 'data/' directories",
}
}
// If it's just a bare top-level key (no trailing slash and no subpath), reject it
if len(parts) == 1 {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "must be a directory (use trailing slash) or contain a subpath",
}
}
remainingPath := parts[1]
if remainingPath == "" {
return nil // allow paths like "data/" or "metadata/"
}
switch topDir {
case "metadata":
return v.validateMetadataFile(remainingPath)
case "data":
return v.validateDataFile(remainingPath)
}
return nil
}
// validateDirectoryPath validates intermediate subdirectories in a path
// isMetadata indicates if we're in the metadata directory (true) or data directory (false)
func validateDirectoryPath(normalizedPath string, isMetadata bool) error {
if isMetadata {
// For metadata, reject any subdirectories (enforce flat structure under metadata/)
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "metadata directory does not support subdirectories",
}
}
// For data, validate each partition or subdirectory segment
subdirs := strings.Split(normalizedPath, "/")
for _, subdir := range subdirs {
if subdir == "" {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "invalid partition or subdirectory format in data path: empty segment",
}
}
// For data, allow both partitions and valid subdirectories
if !partitionPathPattern.MatchString(subdir) && !isValidSubdirectory(subdir) {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "invalid partition or subdirectory format in data path",
}
}
}
return nil
}
// validateFilePatterns validates a filename against allowed patterns
// isMetadata indicates if we're validating metadata files (true) or data files (false)
func validateFilePatterns(filename string, isMetadata bool) error {
var patterns []*regexp.Regexp
var errorMsg string
if isMetadata {
patterns = metadataFilePatterns
errorMsg = "invalid metadata file format: must be a valid Iceberg metadata, manifest, or snapshot file"
} else {
patterns = dataFilePatterns
errorMsg = "invalid data file format: must be .parquet, .orc, or .avro"
}
for _, pattern := range patterns {
if pattern.MatchString(filename) {
return nil
}
}
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: errorMsg,
}
}
// validateFile validates files with a unified logic for metadata and data directories
// isMetadata indicates whether we're validating metadata files (true) or data files (false)
// The logic is:
// 1. If path ends with "/", it's a directory - validate all parts and return nil
// 2. Otherwise, validate intermediate parts, then check the filename against patterns
func (v *IcebergLayoutValidator) validateFile(path string, isMetadata bool) error {
// Detect if it's a directory (path ends with "/")
if strings.HasSuffix(path, "/") {
// Normalize by removing trailing slash
normalizedPath := strings.TrimSuffix(path, "/")
return validateDirectoryPath(normalizedPath, isMetadata)
}
filename := pathpkg.Base(path)
// Validate intermediate subdirectories if present
// Find if there are intermediate directories by looking for the last slash
lastSlash := strings.LastIndex(path, "/")
if lastSlash != -1 {
dir := path[:lastSlash]
if err := validateDirectoryPath(dir, isMetadata); err != nil {
return err
}
}
// Check against allowed file patterns
err := validateFilePatterns(filename, isMetadata)
if err == nil {
return nil
}
// Path could be for a directory without a trailing slash, e.g., "data/year=2024"
if !isMetadata {
if partitionPathPattern.MatchString(filename) || isValidSubdirectory(filename) {
return nil
}
}
return err
}
// validateMetadataFile validates files in the metadata/ directory
// This is a thin wrapper that calls validateFile with isMetadata=true
func (v *IcebergLayoutValidator) validateMetadataFile(path string) error {
return v.validateFile(path, true)
}
// validateDataFile validates files in the data/ directory
// This is a thin wrapper that calls validateFile with isMetadata=false
func (v *IcebergLayoutValidator) validateDataFile(path string) error {
return v.validateFile(path, false)
}
// isValidSubdirectory checks if a path component is a valid subdirectory name
func isValidSubdirectory(name string) bool {
// Allow alphanumeric, underscore, hyphen, and UUID-style directories
return validSubdirectoryPattern.MatchString(name)
}
// IcebergLayoutError represents an Iceberg layout validation error
type IcebergLayoutError struct {
Code string
Message string
}
func (e *IcebergLayoutError) Error() string {
return e.Message
}
// Error code for Iceberg layout violations
const (
ErrCodeInvalidIcebergLayout = "InvalidIcebergLayout"
)
// TableBucketFileValidator validates file uploads to table buckets
type TableBucketFileValidator struct {
layoutValidator *IcebergLayoutValidator
}
// NewTableBucketFileValidator creates a new table bucket file validator
func NewTableBucketFileValidator() *TableBucketFileValidator {
return &TableBucketFileValidator{
layoutValidator: NewIcebergLayoutValidator(),
}
}
// ValidateTableBucketUpload checks if a file upload to a table bucket conforms to Iceberg layout
// fullPath is the complete filer path (e.g., /buckets/mybucket/mynamespace/mytable/data/file.parquet)
// Returns nil if the path is not a table bucket path or if validation passes
// Returns an error if the file doesn't conform to Iceberg layout
func (v *TableBucketFileValidator) ValidateTableBucketUpload(fullPath string) error {
// Check if this is a table bucket path
if !strings.HasPrefix(fullPath, TablesPath+"/") {
return nil // Not a table bucket, no validation needed
}
// Extract the path relative to table bucket root
// Format: /buckets/{bucket}/{namespace}/{table}/{relative-path}
relativePath := strings.TrimPrefix(fullPath, TablesPath+"/")
parts := strings.SplitN(relativePath, "/", 4)
// Need at least bucket/namespace/table/file
if len(parts) < 4 {
// Creating bucket, namespace, or table directories - allow only if preceding parts are non-empty
for i := 0; i < len(parts); i++ {
if parts[i] == "" {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "bucket, namespace, and table segments cannot be empty",
}
}
}
return nil
}
// For full paths, also verify bucket, namespace, and table segments are non-empty
if parts[0] == "" || parts[1] == "" || parts[2] == "" {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "bucket, namespace, and table segments cannot be empty",
}
}
// The last part is the path within the table (data/file.parquet or metadata/v1.json)
tableRelativePath := parts[3]
if tableRelativePath == "" {
return nil
}
// Reject paths with empty segments (double slashes) within the table path
if strings.HasPrefix(tableRelativePath, "/") || strings.Contains(tableRelativePath, "//") {
return &IcebergLayoutError{
Code: ErrCodeInvalidIcebergLayout,
Message: "bucket, namespace, and table segments cannot be empty",
}
}
return v.layoutValidator.ValidateFilePath(tableRelativePath)
}