Merge branch 'master' into filerstore-tikv
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/remote_storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
@@ -72,13 +73,6 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
|
||||
dir := *remoteSyncOptions.dir
|
||||
filerAddress := *remoteSyncOptions.filerAddress
|
||||
|
||||
// read filer remote storage mount mappings
|
||||
_, _, remoteStorageMountLocation, storageConf, detectErr := filer.DetectMountInfo(grpcDialOption, filerAddress, dir)
|
||||
if detectErr != nil {
|
||||
fmt.Printf("read mount info: %v", detectErr)
|
||||
return false
|
||||
}
|
||||
|
||||
filerSource := &source.FilerSource{}
|
||||
filerSource.DoInitialize(
|
||||
filerAddress,
|
||||
@@ -89,7 +83,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
|
||||
|
||||
fmt.Printf("synchronize %s to remote storage...\n", dir)
|
||||
util.RetryForever("filer.remote.sync "+dir, func() error {
|
||||
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
|
||||
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
|
||||
}, func(err error) bool {
|
||||
if err != nil {
|
||||
glog.Errorf("synchronize %s: %v", dir, err)
|
||||
@@ -100,7 +94,13 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
|
||||
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
|
||||
|
||||
// read filer remote storage mount mappings
|
||||
_, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
|
||||
if detectErr != nil {
|
||||
return fmt.Errorf("read mount info: %v", detectErr)
|
||||
}
|
||||
|
||||
dirHash := util.HashStringToLong(mountedDir)
|
||||
|
||||
@@ -115,11 +115,15 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
||||
}
|
||||
|
||||
lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
|
||||
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
|
||||
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
|
||||
glog.V(0).Infof("resume from %v", lastOffsetTs)
|
||||
if mountedDirEntry != nil {
|
||||
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
|
||||
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
|
||||
glog.V(0).Infof("resume from %v", lastOffsetTs)
|
||||
} else {
|
||||
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
|
||||
}
|
||||
} else {
|
||||
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
|
||||
lastOffsetTs = time.Now()
|
||||
}
|
||||
} else {
|
||||
lastOffsetTs = time.Now().Add(-*option.timeAgo)
|
||||
@@ -160,6 +164,10 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
||||
if message.OldEntry != nil && message.NewEntry == nil {
|
||||
glog.V(2).Infof("delete: %+v", resp)
|
||||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
|
||||
if message.OldEntry.IsDirectory {
|
||||
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
|
||||
return client.RemoveDirectory(dest)
|
||||
}
|
||||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
|
||||
return client.DeleteFile(dest)
|
||||
}
|
||||
@@ -206,10 +214,10 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
||||
"filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
|
||||
}
|
||||
|
||||
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
|
||||
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
|
||||
source := string(sourcePath[len(mountDir):])
|
||||
dest := util.FullPath(remoteMountLocation.Path).Child(source)
|
||||
return &filer_pb.RemoteStorageLocation{
|
||||
return &remote_pb.RemoteStorageLocation{
|
||||
Name: remoteMountLocation.Name,
|
||||
Bucket: remoteMountLocation.Bucket,
|
||||
Path: string(dest),
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/hdfs"
|
||||
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
|
||||
|
||||
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
|
||||
|
||||
Reference in New Issue
Block a user