Files
seaweedFS/weed/server/filer_grpc_server.go
Chris Lu 0b3867dca3 filer: add structured error codes to CreateEntryResponse (#8767)
* filer: add FilerError enum and error_code field to CreateEntryResponse

Add a machine-readable error code alongside the existing string error
field. This follows the precedent set by PublishMessageResponse in the
MQ broker proto. The string field is kept for human readability and
backward compatibility.

Defined codes: OK, ENTRY_NAME_TOO_LONG, PARENT_IS_FILE,
EXISTING_IS_DIRECTORY, EXISTING_IS_FILE, ENTRY_ALREADY_EXISTS.

* filer: add sentinel errors and error code mapping in filer_pb

Define sentinel errors (ErrEntryNameTooLong, ErrParentIsFile, etc.) in
the filer_pb package so both the filer and consumers can reference them
without circular imports.

Add FilerErrorToSentinel() to map proto error codes to sentinels, and
update CreateEntryWithResponse() to check error_code first, falling back
to the string-based path for backward compatibility with old servers.

* filer: return wrapped sentinel errors and set proto error codes

Replace fmt.Errorf string errors in filer.CreateEntry, UpdateEntry, and
ensureParentDirectoryEntry with wrapped filer_pb sentinel errors (using
%w). This preserves errors.Is() traversal on the server side.

In the gRPC CreateEntry handler, map sentinel errors to the
corresponding FilerError proto codes using errors.Is(), setting both
resp.Error (string, for backward compat) and resp.ErrorCode (enum).

* S3: use errors.Is() with filer sentinels instead of string matching

Replace fragile string-based error matching in filerErrorToS3Error and
other S3 API consumers with errors.Is() checks against filer_pb sentinel
errors. This works because the updated CreateEntryWithResponse helper
reconstructs sentinel errors from the proto FilerError code.

Update iceberg stage_create and metadata_files to check resp.ErrorCode
instead of parsing resp.Error strings. Update SSE-S3 to use errors.Is()
for the already-exists check.

String matching is retained only for non-filer errors (gRPC transport
errors, checksum validation) that don't go through CreateEntryResponse.

* filer: remove backward-compat string fallbacks for error codes

Clients and servers are always deployed together, so there is no need
for backward-compatibility fallback paths that parse resp.Error strings
when resp.ErrorCode is unset. Simplify all consumers to rely solely on
the structured error code.

* iceberg: ensure unknown non-OK error codes are not silently ignored

When FilerErrorToSentinel returns nil for an unrecognized error code,
return an error including the code and message rather than falling
through to return nil.

* filer: fix redundant error message and restore error wrapping in helper

Use request path instead of resp.Error in the sentinel error format
string to avoid duplicating the sentinel message (e.g. "entry already
exists: entry already exists"). Restore %w wrapping with errors.New()
in the fallback paths so callers can use errors.Is()/errors.As().

* filer: promote file to directory on path conflict instead of erroring

S3 allows both "foo/bar" (object) and "foo/bar/xyzzy" (another object)
to coexist because S3 has a flat key space. When ensureParentDirectoryEntry
finds a parent path that is a file instead of a directory, promote it to
a directory by setting ModeDir while preserving the original content and
chunks. Use Store.UpdateEntry directly to bypass the Filer.UpdateEntry
type-change guard.

This fixes the S3 compatibility test failures where creating overlapping
keys (e.g. "foo/bar" then "foo/bar/xyzzy") returned ExistingObjectIsFile.
2026-03-24 17:08:22 -07:00

451 lines
14 KiB
Go

package weed_server
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
glog.V(4).InfofCtx(ctx, "LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
if err == filer_pb.ErrNotFound {
return &filer_pb.LookupDirectoryEntryResponse{}, err
}
if err != nil {
glog.V(3).InfofCtx(ctx, "LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
return nil, err
}
return &filer_pb.LookupDirectoryEntryResponse{
Entry: entry.ToProtoEntry(),
}, nil
}
func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
glog.V(4).Infof("ListEntries %v", req)
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
}
paginationLimit := filer.PaginationSize
if limit < paginationLimit {
paginationLimit = limit
}
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
snapshotTsNs := req.SnapshotTsNs
if snapshotTsNs == 0 {
snapshotTsNs = time.Now().UnixNano()
}
sentSnapshot := false
var listErr error
for limit > 0 {
var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true
resp := &filer_pb.ListEntriesResponse{
Entry: entry.ToProtoEntry(),
}
if !sentSnapshot {
resp.SnapshotTsNs = snapshotTsNs
sentSnapshot = true
}
if err = stream.Send(resp); err != nil {
return false, err
}
limit--
if limit == 0 {
return false, nil
}
return true, nil
})
if listErr != nil {
return listErr
}
if err != nil {
return err
}
if !hasEntries {
break
}
includeLastFile = false
}
// For empty directories we intentionally do NOT send a snapshot-only
// response (Entry == nil). Many consumers (Java FilerClient, S3 listing,
// etc.) treat any received response as an entry. The Go client-side
// DoSeaweedListWithSnapshot generates a client-side cutoff when the
// server sends no snapshot, so snapshot consistency is preserved
// without a server-side send.
return nil
}
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
resp := &filer_pb.LookupVolumeResponse{
LocationsMap: make(map[string]*filer_pb.Locations),
}
// Use master client's lookup with fallback - it handles cache and master query
vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds)
// Convert wdclient.Location to filer_pb.Location
// Return partial results even if there was an error
for vidString, locations := range vidLocations {
resp.LocationsMap[vidString] = &filer_pb.Locations{
Locations: wdclientLocationsToPb(locations),
}
}
return resp, err
}
func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location {
locs := make([]*filer_pb.Location, 0, len(locations))
for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
GrpcPort: uint32(loc.GrpcPort),
DataCenter: loc.DataCenter,
})
}
return locs
}
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
return nil, err
}
locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
if !found || len(locations) == 0 {
return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
}
for _, loc := range locations {
targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
}
return
}
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
glog.V(4).InfofCtx(ctx, "CreateEntry %v/%v", req.Directory, req.Entry.Name)
resp = &filer_pb.CreateEntryResponse{}
chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
if err2 != nil {
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
if err != nil {
return nil, err
}
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
newEntry.Chunks = chunks
// Don't apply TTL to remote entries - they're managed by remote storage
if newEntry.Remote == nil {
if newEntry.TtlSec == 0 {
newEntry.TtlSec = so.TtlSeconds
}
} else {
newEntry.TtlSec = 0
}
ctx, eventSink := filer.WithMetadataEventSink(ctx)
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)
if createErr == nil {
fs.filer.DeleteChunksNotRecursive(garbage)
resp.MetadataEvent = eventSink.Last()
} else {
glog.V(3).InfofCtx(ctx, "CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
resp.Error = createErr.Error()
switch {
case errors.Is(createErr, filer_pb.ErrEntryNameTooLong):
resp.ErrorCode = filer_pb.FilerError_ENTRY_NAME_TOO_LONG
case errors.Is(createErr, filer_pb.ErrParentIsFile):
resp.ErrorCode = filer_pb.FilerError_PARENT_IS_FILE
case errors.Is(createErr, filer_pb.ErrExistingIsDirectory):
resp.ErrorCode = filer_pb.FilerError_EXISTING_IS_DIRECTORY
case errors.Is(createErr, filer_pb.ErrExistingIsFile):
resp.ErrorCode = filer_pb.FilerError_EXISTING_IS_FILE
case errors.Is(createErr, filer_pb.ErrEntryAlreadyExists):
resp.ErrorCode = filer_pb.FilerError_ENTRY_ALREADY_EXISTS
}
}
return
}
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
glog.V(4).InfofCtx(ctx, "UpdateEntry %v", req)
fullpath := util.Join(req.Directory, req.Entry.Name)
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
if err != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
if err := validateUpdateEntryPreconditions(entry, req.ExpectedExtended); err != nil {
return &filer_pb.UpdateEntryResponse{}, err
}
chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry)
if err2 != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
}
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
newEntry.Chunks = chunks
// Don't apply TTL to remote entries - they're managed by remote storage
if newEntry.Remote != nil {
newEntry.TtlSec = 0
}
if filer.EqualEntry(entry, newEntry) {
return &filer_pb.UpdateEntryResponse{}, err
}
ctx, eventSink := filer.WithMetadataEventSink(ctx)
resp := &filer_pb.UpdateEntryResponse{}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
fs.filer.DeleteChunksNotRecursive(garbage)
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
resp.MetadataEvent = eventSink.Last()
} else {
glog.V(3).InfofCtx(ctx, "UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
return resp, err
}
func validateUpdateEntryPreconditions(entry *filer.Entry, expectedExtended map[string][]byte) error {
if len(expectedExtended) == 0 {
return nil
}
for key, expectedValue := range expectedExtended {
var actualValue []byte
var ok bool
if entry != nil {
actualValue, ok = entry.Extended[key]
}
if ok {
if !bytes.Equal(actualValue, expectedValue) {
return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
}
continue
}
if len(expectedValue) > 0 {
return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
}
}
return nil
}
func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
// remove old chunks if not included in the new ones
if existingEntry != nil {
garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
if err != nil {
return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %w", err)
}
}
// files with manifest chunks are usually large and append only, skip calculating covered chunks
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks())
chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks)
garbage = append(garbage, coveredChunks...)
if newEntry.Attributes != nil {
so, _ := fs.detectStorageOption(ctx, fullpath,
"",
"",
newEntry.Attributes.TtlSec,
"",
"",
"",
"",
) // ignore readonly error for capacity needed to manifestize
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
if err != nil {
// not good, but should be ok
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
}
}
chunks = append(manifestChunks, chunks...)
return
}
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
glog.V(4).InfofCtx(ctx, "AppendToEntry %v", req)
fullpath := util.NewFullPath(req.Directory, req.EntryName)
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
defer lock.StopShortLivedLock()
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, fullpath)
if err == filer_pb.ErrNotFound {
entry = &filer.Entry{
FullPath: fullpath,
Attr: filer.Attr{
Crtime: time.Now(),
Mtime: time.Now(),
Mode: os.FileMode(0644),
Uid: OS_UID,
Gid: OS_GID,
},
}
} else {
offset = int64(filer.TotalSize(entry.GetChunks()))
}
for _, chunk := range req.Chunks {
chunk.Offset = offset
offset += int64(chunk.Size)
}
entry.Chunks = append(entry.GetChunks(), req.Chunks...)
so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
if err != nil {
glog.WarningfCtx(ctx, "detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err
}
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
if err != nil {
// not good, but should be ok
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
}
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false, fs.filer.MaxFilenameLength)
return &filer_pb.AppendToEntryResponse{}, err
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
glog.V(4).InfofCtx(ctx, "DeleteEntry %v", req)
ctx, eventSink := filer.WithMetadataEventSink(ctx)
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
} else {
resp.MetadataEvent = eventSink.Last()
}
return resp, nil
}
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
if req.DiskType == "" {
req.DiskType = fs.option.DiskType
}
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
if err != nil {
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
if assignResult.Error != "" {
glog.V(3).InfofCtx(ctx, "AssignVolume error: %v", assignResult.Error)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
}
return &filer_pb.AssignVolumeResponse{
FileId: assignResult.Fid,
Count: int32(assignResult.Count),
Location: &filer_pb.Location{
Url: assignResult.Url,
PublicUrl: assignResult.PublicUrl,
GrpcPort: uint32(assignResult.GrpcPort),
},
Auth: string(assignResult.Auth),
Collection: so.Collection,
Replication: so.Replication,
}, nil
}
func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
glog.V(4).InfofCtx(ctx, "CollectionList %v", req)
resp = &filer_pb.CollectionListResponse{}
err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: req.IncludeNormalVolumes,
IncludeEcVolumes: req.IncludeEcVolumes,
})
if err != nil {
return err
}
for _, c := range masterResp.Collections {
resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name})
}
return nil
})
return
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
glog.V(4).InfofCtx(ctx, "DeleteCollection %v", req)
err = fs.filer.DoDeleteCollection(req.GetCollection())
return &filer_pb.DeleteCollectionResponse{}, err
}