* fix(filer): apply default disk type after location-prefix resolution in gRPC AssignVolume The gRPC AssignVolume path was applying the filer's default DiskType to the request before calling detectStorageOption. This caused the default to shadow any disk type configured via a filer location-prefix rule, diverging from the HTTP write path which applies the default only when no rule matches. Extract resolveAssignStorageOption to apply the filer default disk type after detectStorageOption, so location-prefix rules take precedence. * fix(filer): apply default disk type after location-prefix resolution in TUS upload path Same class of bug as the gRPC AssignVolume fix: the TUS tusWriteData handler called detectStorageOption0 but never applied the filer's default DiskType when no location-prefix rule matched. This made TUS uploads ignore the -disk flag entirely.
468 lines
15 KiB
Go
468 lines
15 KiB
Go
package weed_server
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
|
|
|
|
glog.V(4).InfofCtx(ctx, "LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
|
|
|
|
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
|
|
if err == filer_pb.ErrNotFound {
|
|
return &filer_pb.LookupDirectoryEntryResponse{}, err
|
|
}
|
|
if err != nil {
|
|
glog.V(3).InfofCtx(ctx, "LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
|
|
return nil, err
|
|
}
|
|
|
|
return &filer_pb.LookupDirectoryEntryResponse{
|
|
Entry: entry.ToProtoEntry(),
|
|
}, nil
|
|
}
|
|
|
|
func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
|
|
|
|
glog.V(4).Infof("ListEntries %v", req)
|
|
|
|
limit := int(req.Limit)
|
|
if limit == 0 {
|
|
limit = fs.option.DirListingLimit
|
|
}
|
|
|
|
paginationLimit := filer.PaginationSize
|
|
if limit < paginationLimit {
|
|
paginationLimit = limit
|
|
}
|
|
|
|
lastFileName := req.StartFromFileName
|
|
includeLastFile := req.InclusiveStartFrom
|
|
snapshotTsNs := req.SnapshotTsNs
|
|
if snapshotTsNs == 0 {
|
|
snapshotTsNs = time.Now().UnixNano()
|
|
}
|
|
sentSnapshot := false
|
|
var listErr error
|
|
for limit > 0 {
|
|
var hasEntries bool
|
|
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) {
|
|
hasEntries = true
|
|
resp := &filer_pb.ListEntriesResponse{
|
|
Entry: entry.ToProtoEntry(),
|
|
}
|
|
if !sentSnapshot {
|
|
resp.SnapshotTsNs = snapshotTsNs
|
|
sentSnapshot = true
|
|
}
|
|
if err = stream.Send(resp); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
limit--
|
|
if limit == 0 {
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
})
|
|
|
|
if listErr != nil {
|
|
return listErr
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !hasEntries {
|
|
break
|
|
}
|
|
|
|
includeLastFile = false
|
|
|
|
}
|
|
|
|
// For empty directories we intentionally do NOT send a snapshot-only
|
|
// response (Entry == nil). Many consumers (Java FilerClient, S3 listing,
|
|
// etc.) treat any received response as an entry. The Go client-side
|
|
// DoSeaweedListWithSnapshot generates a client-side cutoff when the
|
|
// server sends no snapshot, so snapshot consistency is preserved
|
|
// without a server-side send.
|
|
|
|
return nil
|
|
}
|
|
|
|
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
|
|
|
|
resp := &filer_pb.LookupVolumeResponse{
|
|
LocationsMap: make(map[string]*filer_pb.Locations),
|
|
}
|
|
|
|
// Use master client's lookup with fallback - it handles cache and master query
|
|
vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds)
|
|
|
|
// Convert wdclient.Location to filer_pb.Location
|
|
// Return partial results even if there was an error
|
|
for vidString, locations := range vidLocations {
|
|
resp.LocationsMap[vidString] = &filer_pb.Locations{
|
|
Locations: wdclientLocationsToPb(locations),
|
|
}
|
|
}
|
|
|
|
return resp, err
|
|
}
|
|
|
|
func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location {
|
|
locs := make([]*filer_pb.Location, 0, len(locations))
|
|
for _, loc := range locations {
|
|
locs = append(locs, &filer_pb.Location{
|
|
Url: loc.Url,
|
|
PublicUrl: loc.PublicUrl,
|
|
GrpcPort: uint32(loc.GrpcPort),
|
|
DataCenter: loc.DataCenter,
|
|
})
|
|
}
|
|
return locs
|
|
}
|
|
|
|
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
|
|
fid, err := needle.ParseFileIdFromString(fileId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
|
|
if !found || len(locations) == 0 {
|
|
return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
|
|
}
|
|
for _, loc := range locations {
|
|
targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
|
|
}
|
|
return
|
|
}
|
|
|
|
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
|
|
|
|
glog.V(4).InfofCtx(ctx, "CreateEntry %v/%v", req.Directory, req.Entry.Name)
|
|
if len(req.Entry.HardLinkId) > 0 {
|
|
glog.V(4).InfofCtx(ctx, "CreateEntry %s/%s with HardLinkId %x counter=%d", req.Directory, req.Entry.Name, req.Entry.HardLinkId, req.Entry.HardLinkCounter)
|
|
}
|
|
|
|
resp = &filer_pb.CreateEntryResponse{}
|
|
|
|
chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
|
|
if err2 != nil {
|
|
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
|
|
}
|
|
|
|
so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
|
|
newEntry.Chunks = chunks
|
|
// Don't apply TTL to remote entries - they're managed by remote storage
|
|
if newEntry.Remote == nil {
|
|
if newEntry.TtlSec == 0 {
|
|
newEntry.TtlSec = so.TtlSeconds
|
|
}
|
|
} else {
|
|
newEntry.TtlSec = 0
|
|
}
|
|
|
|
ctx, eventSink := filer.WithMetadataEventSink(ctx)
|
|
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)
|
|
|
|
if createErr == nil {
|
|
fs.filer.DeleteChunksNotRecursive(garbage)
|
|
resp.MetadataEvent = eventSink.Last()
|
|
} else {
|
|
glog.V(3).InfofCtx(ctx, "CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
|
|
resp.Error = createErr.Error()
|
|
switch {
|
|
case errors.Is(createErr, filer_pb.ErrEntryNameTooLong):
|
|
resp.ErrorCode = filer_pb.FilerError_ENTRY_NAME_TOO_LONG
|
|
case errors.Is(createErr, filer_pb.ErrParentIsFile):
|
|
resp.ErrorCode = filer_pb.FilerError_PARENT_IS_FILE
|
|
case errors.Is(createErr, filer_pb.ErrExistingIsDirectory):
|
|
resp.ErrorCode = filer_pb.FilerError_EXISTING_IS_DIRECTORY
|
|
case errors.Is(createErr, filer_pb.ErrExistingIsFile):
|
|
resp.ErrorCode = filer_pb.FilerError_EXISTING_IS_FILE
|
|
case errors.Is(createErr, filer_pb.ErrEntryAlreadyExists):
|
|
resp.ErrorCode = filer_pb.FilerError_ENTRY_ALREADY_EXISTS
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
|
|
|
|
glog.V(4).InfofCtx(ctx, "UpdateEntry %v", req)
|
|
if len(req.Entry.HardLinkId) > 0 {
|
|
glog.V(4).InfofCtx(ctx, "UpdateEntry %s/%s with HardLinkId %x counter=%d", req.Directory, req.Entry.Name, req.Entry.HardLinkId, req.Entry.HardLinkCounter)
|
|
}
|
|
|
|
fullpath := util.Join(req.Directory, req.Entry.Name)
|
|
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
|
|
if err != nil {
|
|
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
|
|
}
|
|
if err := validateUpdateEntryPreconditions(entry, req.ExpectedExtended); err != nil {
|
|
return &filer_pb.UpdateEntryResponse{}, err
|
|
}
|
|
|
|
chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry)
|
|
if err2 != nil {
|
|
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
|
|
}
|
|
|
|
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
|
|
newEntry.Chunks = chunks
|
|
|
|
// Don't apply TTL to remote entries - they're managed by remote storage
|
|
if newEntry.Remote != nil {
|
|
newEntry.TtlSec = 0
|
|
}
|
|
|
|
if filer.EqualEntry(entry, newEntry) {
|
|
return &filer_pb.UpdateEntryResponse{}, err
|
|
}
|
|
|
|
ctx, eventSink := filer.WithMetadataEventSink(ctx)
|
|
resp := &filer_pb.UpdateEntryResponse{}
|
|
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
|
|
fs.filer.DeleteChunksNotRecursive(garbage)
|
|
|
|
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
|
|
resp.MetadataEvent = eventSink.Last()
|
|
|
|
} else {
|
|
glog.V(3).InfofCtx(ctx, "UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
|
|
}
|
|
|
|
return resp, err
|
|
}
|
|
|
|
func validateUpdateEntryPreconditions(entry *filer.Entry, expectedExtended map[string][]byte) error {
|
|
if len(expectedExtended) == 0 {
|
|
return nil
|
|
}
|
|
|
|
for key, expectedValue := range expectedExtended {
|
|
var actualValue []byte
|
|
var ok bool
|
|
if entry != nil {
|
|
actualValue, ok = entry.Extended[key]
|
|
}
|
|
if ok {
|
|
if !bytes.Equal(actualValue, expectedValue) {
|
|
return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
|
|
}
|
|
continue
|
|
}
|
|
if len(expectedValue) > 0 {
|
|
return status.Errorf(codes.FailedPrecondition, "extended attribute %q changed", key)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
|
|
|
|
// remove old chunks if not included in the new ones
|
|
if existingEntry != nil {
|
|
garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
|
|
if err != nil {
|
|
return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %w", err)
|
|
}
|
|
}
|
|
|
|
// files with manifest chunks are usually large and append only, skip calculating covered chunks
|
|
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks())
|
|
|
|
chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks)
|
|
garbage = append(garbage, coveredChunks...)
|
|
|
|
if newEntry.Attributes != nil {
|
|
so, _ := fs.detectStorageOption(ctx, fullpath,
|
|
"",
|
|
"",
|
|
newEntry.Attributes.TtlSec,
|
|
"",
|
|
"",
|
|
"",
|
|
"",
|
|
) // ignore readonly error for capacity needed to manifestize
|
|
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
|
|
if err != nil {
|
|
// not good, but should be ok
|
|
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
|
|
}
|
|
}
|
|
|
|
chunks = append(manifestChunks, chunks...)
|
|
|
|
return
|
|
}
|
|
|
|
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
|
|
|
|
glog.V(4).InfofCtx(ctx, "AppendToEntry %v", req)
|
|
fullpath := util.NewFullPath(req.Directory, req.EntryName)
|
|
|
|
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
|
|
lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
|
|
defer lock.StopShortLivedLock()
|
|
|
|
var offset int64 = 0
|
|
entry, err := fs.filer.FindEntry(ctx, fullpath)
|
|
if err == filer_pb.ErrNotFound {
|
|
entry = &filer.Entry{
|
|
FullPath: fullpath,
|
|
Attr: filer.Attr{
|
|
Crtime: time.Now(),
|
|
Mtime: time.Now(),
|
|
Mode: os.FileMode(0644),
|
|
Uid: OS_UID,
|
|
Gid: OS_GID,
|
|
},
|
|
}
|
|
} else {
|
|
offset = int64(filer.TotalSize(entry.GetChunks()))
|
|
}
|
|
|
|
for _, chunk := range req.Chunks {
|
|
chunk.Offset = offset
|
|
offset += int64(chunk.Size)
|
|
}
|
|
|
|
entry.Chunks = append(entry.GetChunks(), req.Chunks...)
|
|
so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
|
|
if err != nil {
|
|
glog.WarningfCtx(ctx, "detectStorageOption: %v", err)
|
|
return &filer_pb.AppendToEntryResponse{}, err
|
|
}
|
|
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
|
|
if err != nil {
|
|
// not good, but should be ok
|
|
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
|
|
}
|
|
|
|
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false, fs.filer.MaxFilenameLength)
|
|
|
|
return &filer_pb.AppendToEntryResponse{}, err
|
|
}
|
|
|
|
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
|
|
|
|
glog.V(4).InfofCtx(ctx, "DeleteEntry %v", req)
|
|
|
|
ctx, eventSink := filer.WithMetadataEventSink(ctx)
|
|
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
|
|
resp = &filer_pb.DeleteEntryResponse{}
|
|
if err != nil && err != filer_pb.ErrNotFound {
|
|
resp.Error = err.Error()
|
|
} else {
|
|
resp.MetadataEvent = eventSink.Last()
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
|
|
|
|
so, err := fs.resolveAssignStorageOption(ctx, req)
|
|
if err != nil {
|
|
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
|
|
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
|
}
|
|
|
|
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
|
|
|
|
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
|
if err != nil {
|
|
glog.V(3).InfofCtx(ctx, "AssignVolume: %v", err)
|
|
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
|
}
|
|
if assignResult.Error != "" {
|
|
glog.V(3).InfofCtx(ctx, "AssignVolume error: %v", assignResult.Error)
|
|
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
|
|
}
|
|
|
|
return &filer_pb.AssignVolumeResponse{
|
|
FileId: assignResult.Fid,
|
|
Count: int32(assignResult.Count),
|
|
Location: &filer_pb.Location{
|
|
Url: assignResult.Url,
|
|
PublicUrl: assignResult.PublicUrl,
|
|
GrpcPort: uint32(assignResult.GrpcPort),
|
|
},
|
|
Auth: string(assignResult.Auth),
|
|
Collection: so.Collection,
|
|
Replication: so.Replication,
|
|
}, nil
|
|
}
|
|
|
|
func (fs *FilerServer) resolveAssignStorageOption(ctx context.Context, req *filer_pb.AssignVolumeRequest) (*operation.StorageOption, error) {
|
|
so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Mirror the HTTP write path: only apply the filer's default disk when the
|
|
// matched locationPrefix rule did not already select one.
|
|
if so.DiskType == "" {
|
|
so.DiskType = fs.option.DiskType
|
|
}
|
|
|
|
return so, nil
|
|
}
|
|
|
|
func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
|
|
|
|
glog.V(4).InfofCtx(ctx, "CollectionList %v", req)
|
|
resp = &filer_pb.CollectionListResponse{}
|
|
|
|
err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
|
|
IncludeNormalVolumes: req.IncludeNormalVolumes,
|
|
IncludeEcVolumes: req.IncludeEcVolumes,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, c := range masterResp.Collections {
|
|
resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name})
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
|
|
|
|
glog.V(4).InfofCtx(ctx, "DeleteCollection %v", req)
|
|
|
|
err = fs.filer.DoDeleteCollection(req.GetCollection())
|
|
|
|
return &filer_pb.DeleteCollectionResponse{}, err
|
|
}
|