fix(filer.sync): initializing the offset is related to the path (#3450)

Co-authored-by: zhihao.qu <zhihao.qu@ly.com>
This commit is contained in:
qzh
2022-08-16 12:56:47 +08:00
committed by GitHub
parent fa4d0093e1
commit 400f0c3e5d

View File

@@ -130,7 +130,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() { go func() {
// a->b // a->b
// set synchronization start timestamp to offset // set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs) initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
if initOffsetError != nil { if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError) glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
os.Exit(2) os.Exit(2)
@@ -165,7 +165,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive { if !*syncOptions.isActivePassive {
// b->a // b->a
// set synchronization start timestamp to offset // set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs) initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
if initOffsetError != nil { if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError) glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
os.Exit(2) os.Exit(2)
@@ -205,14 +205,14 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
} }
// initOffsetFromTsMs Initialize offset // initOffsetFromTsMs Initialize offset
func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error { func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error {
if fromTsMs <= 0 { if fromTsMs <= 0 {
return nil return nil
} }
// convert to nanosecond // convert to nanosecond
fromTsNs := fromTsMs * 1000_000 fromTsNs := fromTsMs * 1000_000
// If not successful, exit the program. // If not successful, exit the program.
setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs) setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs)
if setOffsetErr != nil { if setOffsetErr != nil {
return setOffsetErr return setOffsetErr
} }