Rclone storage backend (#4402)
* Add Rclone storage backend * Support templating the name of files stored via Rclone * Enable Rclone accounting * Remove redundant type conversion * Provide progress information for Rclone download/upload operations * Log error when Rclone can't instantiate filesystem * Remove filename templating functionality for Rclone storage To (maybe) be later reintroduced as a generic functionality for all storage backends. * Remove S3 specific check * Move Rclone config initialisation to init() method
This commit is contained in:
270
weed/storage/backend/rclone_backend/rclone_backend.go
Normal file
270
weed/storage/backend/rclone_backend/rclone_backend.go
Normal file
@@ -0,0 +1,270 @@
|
||||
package rclone_backend
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/rclone/rclone/fs/config/configfile"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
_ "github.com/rclone/rclone/backend/all"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||
)
|
||||
|
||||
func init() {
|
||||
backend.BackendStorageFactories["rclone"] = &RcloneBackendFactory{}
|
||||
configfile.Install()
|
||||
}
|
||||
|
||||
type RcloneBackendFactory struct {
|
||||
}
|
||||
|
||||
func (factory *RcloneBackendFactory) StorageType() backend.StorageType {
|
||||
return "rclone"
|
||||
}
|
||||
|
||||
func (factory *RcloneBackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
|
||||
return newRcloneBackendStorage(configuration, configPrefix, id)
|
||||
}
|
||||
|
||||
type RcloneBackendStorage struct {
|
||||
id string
|
||||
remoteName string
|
||||
fs fs.Fs
|
||||
}
|
||||
|
||||
func newRcloneBackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *RcloneBackendStorage, err error) {
|
||||
s = &RcloneBackendStorage{}
|
||||
s.id = id
|
||||
s.remoteName = configuration.GetString(configPrefix + "remote_name")
|
||||
|
||||
ctx := context.TODO()
|
||||
accounting.Start(ctx)
|
||||
|
||||
fsPath := fmt.Sprintf("%s:", s.remoteName)
|
||||
s.fs, err = fs.NewFs(ctx, fsPath)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to instantiate Rclone filesystem: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(0).Infof("created backend storage rclone.%s for remote name %s", s.id, s.remoteName)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *RcloneBackendStorage) ToProperties() map[string]string {
|
||||
m := make(map[string]string)
|
||||
m["remote_name"] = s.remoteName
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *RcloneBackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
|
||||
f := &RcloneBackendStorageFile{
|
||||
backendStorage: s,
|
||||
key: key,
|
||||
tierInfo: tierInfo,
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func (s *RcloneBackendStorage) CopyFile(f *os.File, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
|
||||
randomUuid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return key, 0, err
|
||||
}
|
||||
key = randomUuid.String()
|
||||
|
||||
glog.V(1).Infof("copy dat file of %s to remote rclone.%s as %s", f.Name(), s.id, key)
|
||||
|
||||
util.Retry("upload via Rclone", func() error {
|
||||
size, err = uploadViaRclone(s.fs, f.Name(), key, fn)
|
||||
return err
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func uploadViaRclone(rfs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
file, err := os.Open(filename)
|
||||
defer func(file *os.File) {
|
||||
err := file.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}(file)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
info := object.NewStaticObjectInfo(key, stat.ModTime(), stat.Size(), true, nil, rfs)
|
||||
|
||||
tr := accounting.NewStats(ctx).NewTransfer(info)
|
||||
defer tr.Done(ctx, err)
|
||||
acc := tr.Account(ctx, file)
|
||||
pr := ProgressReader{acc: acc, tr: tr, fn: fn}
|
||||
|
||||
obj, err := rfs.Put(ctx, &pr, info)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return obj.Size(), err
|
||||
}
|
||||
|
||||
func (s *RcloneBackendStorage) DownloadFile(filename string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
|
||||
glog.V(1).Infof("download dat file of %s from remote rclone.%s as %s", filename, s.id, key)
|
||||
|
||||
util.Retry("download via Rclone", func() error {
|
||||
size, err = downloadViaRclone(s.fs, filename, key, fn)
|
||||
return err
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func downloadViaRclone(fs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
obj, err := fs.NewObject(ctx, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
rc, err := obj.Open(ctx)
|
||||
defer func(rc io.ReadCloser) {
|
||||
err := rc.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}(rc)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
file, err := os.Create(filename)
|
||||
defer func(file *os.File) {
|
||||
err := file.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}(file)
|
||||
|
||||
tr := accounting.NewStats(ctx).NewTransfer(obj)
|
||||
defer tr.Done(ctx, err)
|
||||
acc := tr.Account(ctx, rc)
|
||||
pr := ProgressReader{acc: acc, tr: tr, fn: fn}
|
||||
|
||||
written, err := io.Copy(file, &pr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return written, nil
|
||||
}
|
||||
|
||||
func (s *RcloneBackendStorage) DeleteFile(key string) (err error) {
|
||||
glog.V(1).Infof("delete dat file %s from remote", key)
|
||||
|
||||
util.Retry("delete via Rclone", func() error {
|
||||
err = deleteViaRclone(s.fs, key)
|
||||
return err
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func deleteViaRclone(fs fs.Fs, key string) (err error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
obj, err := fs.NewObject(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return obj.Remove(ctx)
|
||||
}
|
||||
|
||||
type RcloneBackendStorageFile struct {
|
||||
backendStorage *RcloneBackendStorage
|
||||
key string
|
||||
tierInfo *volume_server_pb.VolumeInfo
|
||||
}
|
||||
|
||||
func (rcloneBackendStorageFile RcloneBackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
obj, err := rcloneBackendStorageFile.backendStorage.fs.NewObject(ctx, rcloneBackendStorageFile.key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
opt := fs.RangeOption{Start: off, End: off + int64(len(p)) - 1}
|
||||
|
||||
rc, err := obj.Open(ctx, &opt)
|
||||
defer func(rc io.ReadCloser) {
|
||||
err := rc.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}(rc)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return io.ReadFull(rc, p)
|
||||
}
|
||||
|
||||
func (rcloneBackendStorageFile RcloneBackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (rcloneBackendStorageFile RcloneBackendStorageFile) Truncate(off int64) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (rcloneBackendStorageFile RcloneBackendStorageFile) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rcloneBackendStorageFile RcloneBackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
|
||||
files := rcloneBackendStorageFile.tierInfo.GetFiles()
|
||||
|
||||
if len(files) == 0 {
|
||||
err = fmt.Errorf("remote file info not found")
|
||||
return
|
||||
}
|
||||
|
||||
datSize = int64(files[0].FileSize)
|
||||
modTime = time.Unix(int64(files[0].ModifiedTime), 0)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (rcloneBackendStorageFile RcloneBackendStorageFile) Name() string {
|
||||
return rcloneBackendStorageFile.key
|
||||
}
|
||||
|
||||
func (rcloneBackendStorageFile RcloneBackendStorageFile) Sync() error {
|
||||
return nil
|
||||
}
|
||||
19
weed/storage/backend/rclone_backend/rclone_progress.go
Normal file
19
weed/storage/backend/rclone_backend/rclone_progress.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package rclone_backend
|
||||
|
||||
import "github.com/rclone/rclone/fs/accounting"
|
||||
|
||||
type ProgressReader struct {
|
||||
acc *accounting.Account
|
||||
tr *accounting.Transfer
|
||||
fn func(progressed int64, percentage float32) error
|
||||
}
|
||||
|
||||
func (pr *ProgressReader) Read(p []byte) (n int, err error) {
|
||||
n, err = pr.acc.Read(p)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
snap := pr.tr.Snapshot()
|
||||
err = pr.fn(snap.Bytes, 100*float32(snap.Bytes)/float32(snap.Size))
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user