Files
seaweedFS/weed/server/filer_grpc_server.go
Chris Lu e5ad5e8d4a fix(filer): apply default disk type after location-prefix resolution in gRPC AssignVolume (#8836)
* fix(filer): apply default disk type after location-prefix resolution in gRPC AssignVolume

The gRPC AssignVolume path was applying the filer's default DiskType to
the request before calling detectStorageOption. This caused the default
to shadow any disk type configured via a filer location-prefix rule,
diverging from the HTTP write path which applies the default only when
no rule matches.

Extract resolveAssignStorageOption to apply the filer default disk type
after detectStorageOption, so location-prefix rules take precedence.

* fix(filer): apply default disk type after location-prefix resolution in TUS upload path

Same class of bug as the gRPC AssignVolume fix: the TUS tusWriteData
handler called detectStorageOption0 but never applied the filer's
default DiskType when no location-prefix rule matched. This made TUS
uploads ignore the -disk flag entirely.
2026-03-29 14:18:24 -07:00

468 lines
15 KiB
Go

package weed_server
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
glog.V(4).InfofCtx(ctx, "LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
if err == filer_pb.ErrNotFound {
return &filer_pb.LookupDirectoryEntryResponse{}, err
}
if err != nil {
glog.V(3).InfofCtx(ctx, "LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
return nil, err
}
return &filer_pb.LookupDirectoryEntryResponse{
Entry: entry.ToProtoEntry(),
}, nil
}
func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
glog.V(4).Infof("ListEntries %v", req)
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
}
paginationLimit := filer.PaginationSize
if limit < paginationLimit {
paginationLimit = limit
}
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
snapshotTsNs := req.SnapshotTsNs
if snapshotTsNs == 0 {
snapshotTsNs = time.Now().UnixNano()
}
sentSnapshot := false
var listErr error
for limit > 0 {
var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true
resp := &filer_pb.ListEntriesResponse{
Entry: entry.ToProtoEntry(),
}
if !sentSnapshot {
resp.SnapshotTsNs = snapshotTsNs
sentSnapshot = true
}
if err = stream.Send(resp); err != nil {
return false, err
}
limit--
if limit == 0 {
return false, nil
}
return true, nil
})
if listErr != nil {
return listErr
}
if err != nil {
return err
}
if !hasEntries {
break
}
includeLastFile = false
}
// For empty directories we intentionally do NOT send a snapshot-only
// response (Entry == nil). Many consumers (Java FilerClient, S3 listing,
// etc.) treat any received response as an entry. The Go client-side
// DoSeaweedListWithSnapshot generates a client-side cutoff when the
// server sends no snapshot, so snapshot consistency is preserved
// without a server-side send.
return nil
}
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
resp := &filer_pb.LookupVolumeResponse{
LocationsMap: make(map[string]*filer_pb.Locations),
}
// Use master client's lookup with fallback - it handles cache and master query
vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds)
// Convert wdclient.Location to filer_pb.Location
// Return partial results even if there was an error
for vidString, locations := range vidLocations {
resp.LocationsMap[vidString] = &filer_pb.Locations{
Locations: wdclientLocationsToPb(locations),
}
}
return resp, err
}
func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location {
locs := make([]*filer_pb.Location, 0, len(locations))
for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
GrpcPort: uint32(loc.GrpcPort),
DataCenter: loc.DataCenter,
})
}
return locs
}
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
return nil, err
}
locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
if !found || len(locations) == 0 {
return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
}
for _, loc := range locations {
targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
}
return
}
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
glog.V(4).InfofCtx(ctx, "CreateEntry %v/%v", req.Directory, req.Entry.Name)
if len(req.Entry.HardLinkId) > 0 {
glog.V(4).InfofCtx(ctx, "CreateEntry %s/%s with HardLinkId %x counter=%d", req.Directory, req.Entry.Name, req.Entry.HardLinkId, req.Entry.HardLinkCounter)
}
resp = &filer_pb.CreateEntryResponse{}
chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
if err2 != nil {
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
if err != nil {
return nil, err
}
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
newEntry.Chunks = chunks
// Don't apply TTL to remote entries - they're managed by remote storage
if newEntry.Remote == nil {
if newEntry.TtlSec == 0 {
newEntry.TtlSec = so.TtlSeconds
}
} else {
newEntry.TtlSec = 0
}
ctx, eventSink := filer.WithMetadataEventSink(ctx)
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)
if createErr == nil {
fs.filer.DeleteChunksNotRecursive(garbage)
resp.MetadataEvent = eventSink.Last()
} else {
glog.V(3).InfofCtx(ctx, "CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
resp.Error = createErr.Error()
switch {
case errors.Is(createErr, filer_pb.ErrEntryNameTooLong):
resp.ErrorCode = filer_pb.FilerError_ENTRY_NAME_TOO_LONG
case errors.Is(createErr, filer_pb.ErrParentIsFile):
resp.ErrorCode = filer_pb.FilerError_PARENT_IS_FILE
case errors.Is(createErr, filer_pb.ErrExistingIsDirectory):
resp.ErrorCode = filer_pb.FilerError_EXISTING_IS_DIRECTORY
case errors.Is(createErr, filer_pb.ErrExistingIsFile):
resp.ErrorCode = filer_pb.FilerError_EXISTING_IS_FILE
case errors.Is(createErr, filer_pb.ErrEntryAlreadyExists):
resp.ErrorCode = filer_pb.FilerError_ENTRY_ALREADY_EXISTS
}
}
return
}
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
glog.V(4).InfofCtx(ctx, "UpdateEntry %v", req)
if len(req.Entry.HardLinkId) > 0 {
glog.V(4).InfofCtx(ctx, "UpdateEntry %s/%s with HardLinkId %x counter=%d", req.Directory, req.Entry.Name, req.Entry.HardLinkId, req.Entry.HardLinkCounter)
}
fullpath := util.Join(req.Directory, req.Entry.Name)
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
if err != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
if err := validateUpdateEntryPreconditions(entry, req.ExpectedExtended); err != nil {
return &filer_pb.UpdateEntryResponse{}, err
}
chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry)
if err2 != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
}
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
newEntry.Chunks = chunks
// Don't apply TTL to remote entries - they're managed by remote storage
if newEntry.Remote != nil {
newEntry.TtlSec = 0
}
if filer.EqualEntry(entry, newEntry) {
return &filer_pb.UpdateEntryResponse{}, err
}
ctx, eventSink := filer.WithMetadataEventSink(ctx)
resp := &filer_pb.UpdateEntryResponse{}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
fs.filer.DeleteChunksNotRecursive(garbage)
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
resp.MetadataEvent = eventSink.Last()
} else {
glog.V(3).InfofCtx(ctx, "UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
return resp, err
}
func validateUpdateEntryPreconditions(entry *filer.Entry, expectedExtended map[string][]byte) error {
if len(expectedExtended) == 0 {
return nil
}
for key, expectedValue := range expectedExtended {
var actualValue []byte
var ok bool
if entry != nil {
actualValue, ok = entry.Extended[key]
}
if ok {
if !bytes.Equal(actualValue, expectedValue) {
return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
}
continue
}
if len(expectedValue) > 0 {
return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
}
}
return nil
}
func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
// remove old chunks if not included in the new ones
if existingEntry != nil {
garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
if err != nil {
return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %w", err)
}
}
// files with manifest chunks are usually large and append only, skip calculating covered chunks
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks())
chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks)
garbage = append(garbage, coveredChunks...)
if newEntry.Attributes != nil {
so, _ := fs.detectStorageOption(ctx, fullpath,
"",
"",
newEntry.Attributes.TtlSec,
"",
"",
"",
"",
) // ignore readonly error for capacity needed to manifestize
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
if err != nil {
// not good, but should be ok
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
}
}
chunks = append(manifestChunks, chunks...)
return
}
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
glog.V(4).InfofCtx(ctx, "AppendToEntry %v", req)
fullpath := util.NewFullPath(req.Directory, req.EntryName)
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
defer lock.StopShortLivedLock()
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, fullpath)
if err == filer_pb.ErrNotFound {
entry = &filer.Entry{
FullPath: fullpath,
Attr: filer.Attr{
Crtime: time.Now(),
Mtime: time.Now(),
Mode: os.FileMode(0644),
Uid: OS_UID,
Gid: OS_GID,
},
}
} else {
offset = int64(filer.TotalSize(entry.GetChunks()))
}
for _, chunk := range req.Chunks {
chunk.Offset = offset
offset += int64(chunk.Size)
}
entry.Chunks = append(entry.GetChunks(), req.Chunks...)
so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
if err != nil {
glog.WarningfCtx(ctx, "detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err
}
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
if err != nil {
// not good, but should be ok
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
}
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false, fs.filer.MaxFilenameLength)
return &filer_pb.AppendToEntryResponse{}, err
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
glog.V(4).InfofCtx(ctx, "DeleteEntry %v", req)
ctx, eventSink := filer.WithMetadataEventSink(ctx)
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
} else {
resp.MetadataEvent = eventSink.Last()
}
return resp, nil
}
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
so, err := fs.resolveAssignStorageOption(ctx, req)
if err != nil {
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
if assignResult.Error != "" {
glog.V(3).InfofCtx(ctx, "AssignVolume error: %v", assignResult.Error)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
}
return &filer_pb.AssignVolumeResponse{
FileId: assignResult.Fid,
Count: int32(assignResult.Count),
Location: &filer_pb.Location{
Url: assignResult.Url,
PublicUrl: assignResult.PublicUrl,
GrpcPort: uint32(assignResult.GrpcPort),
},
Auth: string(assignResult.Auth),
Collection: so.Collection,
Replication: so.Replication,
}, nil
}
func (fs *FilerServer) resolveAssignStorageOption(ctx context.Context, req *filer_pb.AssignVolumeRequest) (*operation.StorageOption, error) {
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
if err != nil {
return nil, err
}
// Mirror the HTTP write path: only apply the filer's default disk when the
// matched locationPrefix rule did not already select one.
if so.DiskType == "" {
so.DiskType = fs.option.DiskType
}
return so, nil
}
func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
glog.V(4).InfofCtx(ctx, "CollectionList %v", req)
resp = &filer_pb.CollectionListResponse{}
err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: req.IncludeNormalVolumes,
IncludeEcVolumes: req.IncludeEcVolumes,
})
if err != nil {
return err
}
for _, c := range masterResp.Collections {
resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name})
}
return nil
})
return
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
glog.V(4).InfofCtx(ctx, "DeleteCollection %v", req)
err = fs.filer.DoDeleteCollection(req.GetCollection())
return &filer_pb.DeleteCollectionResponse{}, err
}