Fix sftp performances and add seaweedfs all-in-one deployment (#6792)
* improve perfs & fix rclone & refactoring Signed-off-by: Mohamed Sekour <mohamed.sekour@exfo.com> * improve perfs on download + add seaweedfs all-in-one deployment Signed-off-by: Mohamed Sekour <mohamed.sekour@exfo.com> * use helper for topologySpreadConstraints and fix create home dir of sftp users Signed-off-by: Mohamed Sekour <mohamed.sekour@exfo.com> * fix helm lint Signed-off-by: Mohamed Sekour <mohamed.sekour@exfo.com> * add missing ctx param Signed-off-by: Mohamed Sekour <mohamed.sekour@exfo.com> --------- Signed-off-by: Mohamed Sekour <mohamed.sekour@exfo.com>
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
package sftpd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
@@ -15,7 +14,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/sftp"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -48,6 +46,9 @@ func (fs *SftpServer) getEntry(p string) (*filer_pb.Entry, error) {
|
||||
err := fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||
r, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{Directory: dir, Name: name})
|
||||
if err != nil {
|
||||
if isNotExistError(err) {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
return err
|
||||
}
|
||||
if r.Entry == nil {
|
||||
@@ -57,11 +58,21 @@ func (fs *SftpServer) getEntry(p string) (*filer_pb.Entry, error) {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotExistError(err) {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
return nil, fmt.Errorf("lookup %s: %w", p, err)
|
||||
}
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func isNotExistError(err error) bool {
|
||||
return strings.Contains(err.Error(), "not found") ||
|
||||
strings.Contains(err.Error(), "no entry is found") ||
|
||||
strings.Contains(err.Error(), "file does not exist") ||
|
||||
err == os.ErrNotExist
|
||||
}
|
||||
|
||||
// updateEntry sends an UpdateEntryRequest for the given entry.
|
||||
func (fs *SftpServer) updateEntry(dir string, entry *filer_pb.Entry) error {
|
||||
return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||
@@ -116,94 +127,30 @@ func (fs *SftpServer) readFile(r *sftp.Request) (io.ReaderAt, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SeaweedFileReaderAt{fs: fs, entry: entry}, nil
|
||||
}
|
||||
|
||||
// putFile uploads a file to the filer and sets ownership metadata.
|
||||
func (fs *SftpServer) putFile(filepath string, data []byte, user *user.User) error {
|
||||
dir, filename := util.FullPath(filepath).DirAndName()
|
||||
uploadUrl := fmt.Sprintf("http://%s%s", fs.filerAddr, filepath)
|
||||
|
||||
// Create a reader from our buffered data and calculate MD5 hash
|
||||
hash := md5.New()
|
||||
reader := bytes.NewReader(data)
|
||||
body := io.TeeReader(reader, hash)
|
||||
fileSize := int64(len(data))
|
||||
|
||||
// Create and execute HTTP request
|
||||
proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %v", err)
|
||||
}
|
||||
proxyReq.ContentLength = fileSize
|
||||
proxyReq.Header.Set("Content-Type", "application/octet-stream")
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(proxyReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upload to filer: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Process response
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read response: %v", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
var result weed_server.FilerPostResult
|
||||
if err := json.Unmarshal(respBody, &result); err != nil {
|
||||
return fmt.Errorf("parse response: %v", err)
|
||||
}
|
||||
|
||||
if result.Error != "" {
|
||||
return fmt.Errorf("filer error: %s", result.Error)
|
||||
}
|
||||
|
||||
// Update file ownership using the same pattern as other functions
|
||||
if user != nil {
|
||||
err := fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||
// Look up the file to get its current entry
|
||||
lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: dir,
|
||||
Name: filename,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("lookup file for attribute update: %v", err)
|
||||
}
|
||||
|
||||
if lookupResp.Entry == nil {
|
||||
return fmt.Errorf("file not found after upload: %s/%s", dir, filename)
|
||||
}
|
||||
|
||||
// Update the entry with new uid/gid
|
||||
entry := lookupResp.Entry
|
||||
entry.Attributes.Uid = user.Uid
|
||||
entry.Attributes.Gid = user.Gid
|
||||
|
||||
// Update the entry in the filer
|
||||
_, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
|
||||
Directory: dir,
|
||||
Entry: entry,
|
||||
})
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Log the error but don't fail the whole operation
|
||||
glog.Errorf("Failed to update file ownership for %s: %v", filepath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return NewSeaweedFileReaderAt(fs, entry), nil
|
||||
}
|
||||
|
||||
func (fs *SftpServer) newFileWriter(r *sftp.Request) (io.WriterAt, error) {
|
||||
return &filerFileWriter{fs: *fs, req: r, permissions: 0644, uid: fs.user.Uid, gid: fs.user.Gid}, nil
|
||||
dir, _ := util.FullPath(r.Filepath).DirAndName()
|
||||
if err := fs.checkFilePermission(dir, "write"); err != nil {
|
||||
glog.Errorf("Permission denied for %s", dir)
|
||||
return nil, err
|
||||
}
|
||||
// Create a temporary file to buffer writes
|
||||
tmpFile, err := os.CreateTemp("", "sftp-upload-*")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp file: %v", err)
|
||||
}
|
||||
|
||||
return &SeaweedSftpFileWriter{
|
||||
fs: *fs,
|
||||
req: r,
|
||||
tmpFile: tmpFile,
|
||||
permissions: 0644,
|
||||
uid: fs.user.Uid,
|
||||
gid: fs.user.Gid,
|
||||
offset: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fs *SftpServer) removeEntry(r *sftp.Request) error {
|
||||
@@ -317,7 +264,7 @@ func (fs *SftpServer) makeDir(r *sftp.Request) error {
|
||||
return fmt.Errorf("cannot create directory: no user info")
|
||||
}
|
||||
dir, name := util.FullPath(r.Filepath).DirAndName()
|
||||
if err := fs.checkFilePermission(dir, "mkdir"); err != nil {
|
||||
if err := fs.checkFilePermission(r.Filepath, "mkdir"); err != nil {
|
||||
return err
|
||||
}
|
||||
// default mode and ownership
|
||||
@@ -345,6 +292,81 @@ func (fs *SftpServer) removeDir(r *sftp.Request) error {
|
||||
return fs.deleteEntry(r.Filepath, false)
|
||||
}
|
||||
|
||||
func (fs *SftpServer) putFile(filepath string, reader io.Reader, user *user.User) error {
|
||||
dir, filename := util.FullPath(filepath).DirAndName()
|
||||
uploadUrl := fmt.Sprintf("http://%s%s", fs.filerAddr, filepath)
|
||||
|
||||
// Compute MD5 while uploading
|
||||
hash := md5.New()
|
||||
body := io.TeeReader(reader, hash)
|
||||
|
||||
// We can skip ContentLength if unknown (chunked transfer encoding)
|
||||
req, err := http.NewRequest(http.MethodPut, uploadUrl, body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %v", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/octet-stream")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upload to filer: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read response: %v", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
var result weed_server.FilerPostResult
|
||||
if err := json.Unmarshal(respBody, &result); err != nil {
|
||||
return fmt.Errorf("parse response: %v", err)
|
||||
}
|
||||
if result.Error != "" {
|
||||
return fmt.Errorf("filer error: %s", result.Error)
|
||||
}
|
||||
// Update file ownership using the same pattern as other functions
|
||||
if user != nil {
|
||||
err := fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
|
||||
// Look up the file to get its current entry
|
||||
lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: dir,
|
||||
Name: filename,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("lookup file for attribute update: %v", err)
|
||||
}
|
||||
|
||||
if lookupResp.Entry == nil {
|
||||
return fmt.Errorf("file not found after upload: %s/%s", dir, filename)
|
||||
}
|
||||
|
||||
// Update the entry with new uid/gid
|
||||
entry := lookupResp.Entry
|
||||
entry.Attributes.Uid = user.Uid
|
||||
entry.Attributes.Gid = user.Gid
|
||||
|
||||
// Update the entry in the filer
|
||||
_, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
|
||||
Directory: dir,
|
||||
Entry: entry,
|
||||
})
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Log the error but don't fail the whole operation
|
||||
glog.Errorf("Failed to update file ownership for %s: %v", filepath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ==================== Common Arguments Helpers ====================
|
||||
|
||||
func FileInfoFromEntry(e *filer_pb.Entry) FileInfo {
|
||||
@@ -390,73 +412,6 @@ func (fi *EnhancedFileInfo) Owner() (uid, gid int) {
|
||||
return int(fi.uid), int(fi.gid)
|
||||
}
|
||||
|
||||
// SeaweedFileReaderAt implements io.ReaderAt for SeaweedFS files
|
||||
|
||||
type SeaweedFileReaderAt struct {
|
||||
fs *SftpServer
|
||||
entry *filer_pb.Entry
|
||||
}
|
||||
|
||||
func (ra *SeaweedFileReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
// Create a new reader for each ReadAt call
|
||||
reader := filer.NewFileReader(ra.fs, ra.entry)
|
||||
if reader == nil {
|
||||
return 0, fmt.Errorf("failed to create file reader")
|
||||
}
|
||||
|
||||
// Check if we're reading past the end of the file
|
||||
fileSize := int64(ra.entry.Attributes.FileSize)
|
||||
if off >= fileSize {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
// Seek to the offset
|
||||
if seeker, ok := reader.(io.Seeker); ok {
|
||||
_, err = seeker.Seek(off, io.SeekStart)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("seek error: %v", err)
|
||||
}
|
||||
} else {
|
||||
// If the reader doesn't implement Seek, we need to read and discard bytes
|
||||
toSkip := off
|
||||
skipBuf := make([]byte, 8192)
|
||||
for toSkip > 0 {
|
||||
skipSize := int64(len(skipBuf))
|
||||
if skipSize > toSkip {
|
||||
skipSize = toSkip
|
||||
}
|
||||
read, err := reader.Read(skipBuf[:skipSize])
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("skip error: %v", err)
|
||||
}
|
||||
if read == 0 {
|
||||
return 0, fmt.Errorf("unable to skip to offset %d", off)
|
||||
}
|
||||
toSkip -= int64(read)
|
||||
}
|
||||
}
|
||||
|
||||
// Adjust read length if it would go past EOF
|
||||
readLen := len(p)
|
||||
remaining := fileSize - off
|
||||
if int64(readLen) > remaining {
|
||||
readLen = int(remaining)
|
||||
if readLen == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
}
|
||||
|
||||
// Read the data
|
||||
n, err = io.ReadFull(reader, p[:readLen])
|
||||
|
||||
// Handle EOF correctly
|
||||
if err == io.ErrUnexpectedEOF || (err == nil && n < len(p)) {
|
||||
err = io.EOF
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (fs *SftpServer) checkFilePermission(filepath string, permissions string) error {
|
||||
return fs.authManager.CheckPermission(fs.user, filepath, permissions)
|
||||
return fs.CheckFilePermission(filepath, permissions)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user