Files
seaweedFS/weed/s3api/iceberg/metadata_files.go
Chris Lu 0b3867dca3 filer: add structured error codes to CreateEntryResponse (#8767)
* filer: add FilerError enum and error_code field to CreateEntryResponse

Add a machine-readable error code alongside the existing string error
field. This follows the precedent set by PublishMessageResponse in the
MQ broker proto. The string field is kept for human readability and
backward compatibility.

Defined codes: OK, ENTRY_NAME_TOO_LONG, PARENT_IS_FILE,
EXISTING_IS_DIRECTORY, EXISTING_IS_FILE, ENTRY_ALREADY_EXISTS.

* filer: add sentinel errors and error code mapping in filer_pb

Define sentinel errors (ErrEntryNameTooLong, ErrParentIsFile, etc.) in
the filer_pb package so both the filer and consumers can reference them
without circular imports.

Add FilerErrorToSentinel() to map proto error codes to sentinels, and
update CreateEntryWithResponse() to check error_code first, falling back
to the string-based path for backward compatibility with old servers.

* filer: return wrapped sentinel errors and set proto error codes

Replace fmt.Errorf string errors in filer.CreateEntry, UpdateEntry, and
ensureParentDirectoryEntry with wrapped filer_pb sentinel errors (using
%w). This preserves errors.Is() traversal on the server side.

In the gRPC CreateEntry handler, map sentinel errors to the
corresponding FilerError proto codes using errors.Is(), setting both
resp.Error (string, for backward compat) and resp.ErrorCode (enum).

* S3: use errors.Is() with filer sentinels instead of string matching

Replace fragile string-based error matching in filerErrorToS3Error and
other S3 API consumers with errors.Is() checks against filer_pb sentinel
errors. This works because the updated CreateEntryWithResponse helper
reconstructs sentinel errors from the proto FilerError code.

Update iceberg stage_create and metadata_files to check resp.ErrorCode
instead of parsing resp.Error strings. Update SSE-S3 to use errors.Is()
for the already-exists check.

String matching is retained only for non-filer errors (gRPC transport
errors, checksum validation) that don't go through CreateEntryResponse.

* filer: remove backward-compat string fallbacks for error codes

Clients and servers are always deployed together, so there is no need
for backward-compatibility fallback paths that parse resp.Error strings
when resp.ErrorCode is unset. Simplify all consumers to rely solely on
the structured error code.

* iceberg: ensure unknown non-OK error codes are not silently ignored

When FilerErrorToSentinel returns nil for an unrecognized error code,
return an error including the code and message rather than falling
through to return nil.

* filer: fix redundant error message and restore error wrapping in helper

Use request path instead of resp.Error in the sentinel error format
string to avoid duplicating the sentinel message (e.g. "entry already
exists: entry already exists"). Restore %w wrapping with errors.New()
in the fallback paths so callers can use errors.Is()/errors.As().

* filer: promote file to directory on path conflict instead of erroring

S3 allows both "foo/bar" (object) and "foo/bar/xyzzy" (another object)
to coexist because S3 has a flat key space. When ensureParentDirectoryEntry
finds a parent path that is a file instead of a directory, promote it to
a directory by setting ModeDir while preserving the original content and
chunks. Use Store.UpdateEntry directly to bypass the Filer.UpdateEntry
type-change guard.

This fixes the S3 compatibility test failures where creating overlapping
keys (e.g. "foo/bar" then "foo/bar/xyzzy") returned ExistingObjectIsFile.
2026-03-24 17:08:22 -07:00

161 lines
4.9 KiB
Go

package iceberg
import (
"context"
"fmt"
"os"
"path"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
// saveMetadataFile saves the Iceberg metadata JSON file to the filer.
// It constructs the filer path from the S3 location components.
func (s *Server) saveMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string, content []byte) error {
// Create context with timeout for file operations
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
bucketsPath := s3tables.TablesPath
ensureDir := func(parent, name, errorContext string) error {
_, err := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
Name: name,
})
if err == nil {
return nil
}
if err != filer_pb.ErrNotFound {
return fmt.Errorf("lookup %s failed: %w", errorContext, err)
}
// If lookup fails with ErrNotFound, try to create the directory.
resp, createErr := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: parent,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0755 | os.ModeDir),
},
},
})
if createErr != nil {
return fmt.Errorf("failed to create %s: %w", errorContext, createErr)
}
if resp.ErrorCode != filer_pb.FilerError_OK && resp.ErrorCode != filer_pb.FilerError_ENTRY_ALREADY_EXISTS {
return fmt.Errorf("failed to create %s: %s", errorContext, resp.Error)
}
return nil
}
bucketDir := path.Join(bucketsPath, bucketName)
// 1. Ensure bucket directory exists: <bucketsPath>/<bucket>
if err := ensureDir(bucketsPath, bucketName, "bucket directory"); err != nil {
return err
}
// 2. Ensure table path exists under the bucket directory
tableDir := bucketDir
if tablePath != "" {
segments := strings.Split(tablePath, "/")
for _, segment := range segments {
if segment == "" {
continue
}
if err := ensureDir(tableDir, segment, "table directory"); err != nil {
return err
}
tableDir = path.Join(tableDir, segment)
}
}
// 3. Ensure metadata directory exists: <bucketsPath>/<bucket>/<tablePath>/metadata
metadataDir := path.Join(tableDir, "metadata")
if err := ensureDir(tableDir, "metadata", "metadata directory"); err != nil {
return err
}
// 4. Write the file
resp, err := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: metadataDir,
Entry: &filer_pb.Entry{
Name: metadataFileName,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0644),
FileSize: uint64(len(content)),
},
Content: content,
Extended: map[string][]byte{
"Mime-Type": []byte("application/json"),
},
},
})
if err != nil {
return fmt.Errorf("failed to write metadata file: %w", err)
}
if resp.ErrorCode != filer_pb.FilerError_OK {
if sentinel := filer_pb.FilerErrorToSentinel(resp.ErrorCode); sentinel != nil {
return fmt.Errorf("failed to write metadata file: %w", sentinel)
}
return fmt.Errorf("failed to write metadata file: code=%v %s", resp.ErrorCode, resp.Error)
}
return nil
})
}
func (s *Server) deleteMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string) error {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
metadataDir := path.Join(s3tables.TablesPath, bucketName)
if tablePath != "" {
metadataDir = path.Join(metadataDir, tablePath)
}
metadataDir = path.Join(metadataDir, "metadata")
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.DoRemove(opCtx, client, metadataDir, metadataFileName, true, false, true, false, nil)
})
}
func (s *Server) loadMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string) ([]byte, error) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
metadataDir := path.Join(s3tables.TablesPath, bucketName)
if tablePath != "" {
metadataDir = path.Join(metadataDir, tablePath)
}
metadataDir = path.Join(metadataDir, "metadata")
var content []byte
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{
Directory: metadataDir,
Name: metadataFileName,
})
if err != nil {
return err
}
if resp == nil || resp.Entry == nil {
return fmt.Errorf("lookup returned nil entry for %s/%s", metadataDir, metadataFileName)
}
content = append([]byte(nil), resp.Entry.Content...)
return nil
})
if err != nil {
return nil, err
}
return content, nil
}