stop when on disk log is done

This commit is contained in:
chrislu
2022-05-30 15:20:51 -07:00
parent a2b101a737
commit aece35a64f
2 changed files with 15 additions and 7 deletions

View File

@@ -39,15 +39,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
var isDone bool
for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if isDone {
return nil
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
@@ -98,15 +102,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
var isDone bool
for {
// println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if isDone {
return nil
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)