This commit is contained in:
Chris Lu
2021-08-08 22:30:36 -07:00
parent df85f7a1eb
commit c5f38c365d
12 changed files with 24 additions and 27 deletions

View File

@@ -107,12 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_" + dataSink.GetName(),
return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
}

View File

@@ -189,7 +189,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return nil
}
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3 * time.Second, func(counter int64, lastTsNs int64) error {
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
return metaBackup.setOffset(lastTime)

View File

@@ -252,9 +252,9 @@ func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer
entry.RemoteEntry = remoteEntry
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
Directory: dir,
Entry: entry,
})
return err
})
}
}

View File

@@ -165,12 +165,12 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_" + targetFiler,
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler,
sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}

View File

@@ -27,5 +27,4 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
)
)