save updated lastTsNs
This commit is contained in:
@@ -58,7 +58,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
|||||||
}
|
}
|
||||||
|
|
||||||
util.RetryForever("followMetaUpdates", func() error {
|
util.RetryForever("followMetaUpdates", func() error {
|
||||||
return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true)
|
return pb.WithFilerClientFollowMetadata(client, "mount", dir, &lastTsNs, selfSignature, processEventFn, true)
|
||||||
}, func(err error) bool {
|
}, func(err error) bool {
|
||||||
glog.Errorf("follow metadata updates: %v", err)
|
glog.Errorf("follow metadata updates: %v", err)
|
||||||
return true
|
return true
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption,
|
|||||||
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
|
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
|
||||||
|
|
||||||
err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName,
|
err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName,
|
||||||
pathPrefix, additionalPathPrefixes, lastTsNs, selfSignature, processEventFn, fatalOnError))
|
pathPrefix, additionalPathPrefixes, &lastTsNs, selfSignature, processEventFn, fatalOnError))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("subscribing filer meta change: %v", err)
|
return fmt.Errorf("subscribing filer meta change: %v", err)
|
||||||
}
|
}
|
||||||
@@ -25,7 +25,7 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
|
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
|
||||||
clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
|
clientName string, pathPrefix string, lastTsNs *int64, selfSignature int32,
|
||||||
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
|
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
|
||||||
|
|
||||||
err := filerClient.WithFilerClient(makeFunc(clientName,
|
err := filerClient.WithFilerClient(makeFunc(clientName,
|
||||||
@@ -37,7 +37,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32,
|
func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, selfSignature int32,
|
||||||
processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
|
processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
|
||||||
return func(client filer_pb.SeaweedFilerClient) error {
|
return func(client filer_pb.SeaweedFilerClient) error {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@@ -46,7 +46,7 @@ func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []str
|
|||||||
ClientName: clientName,
|
ClientName: clientName,
|
||||||
PathPrefix: pathPrefix,
|
PathPrefix: pathPrefix,
|
||||||
PathPrefixes: additionalPathPrefixes,
|
PathPrefixes: additionalPathPrefixes,
|
||||||
SinceNs: lastTsNs,
|
SinceNs: *lastTsNs,
|
||||||
Signature: selfSignature,
|
Signature: selfSignature,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -69,7 +69,7 @@ func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []str
|
|||||||
glog.Errorf("process %v: %v", resp, err)
|
glog.Errorf("process %v: %v", resp, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lastTsNs = resp.TsNs
|
*lastTsNs = resp.TsNs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
|
|||||||
}
|
}
|
||||||
|
|
||||||
util.RetryForever("followIamChanges", func() error {
|
util.RetryForever("followIamChanges", func() error {
|
||||||
return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, lastTsNs, 0, processEventFn, true)
|
return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, &lastTsNs, 0, processEventFn, true)
|
||||||
}, func(err error) bool {
|
}, func(err error) bool {
|
||||||
glog.V(0).Infof("iam follow metadata changes: %v", err)
|
glog.V(0).Infof("iam follow metadata changes: %v", err)
|
||||||
return true
|
return true
|
||||||
|
|||||||
Reference in New Issue
Block a user