metadata log: read from any timestamp

This commit is contained in:
Chris Lu
2020-04-27 23:49:46 -07:00
parent 47c4a62c5d
commit 5c57297bd1
3 changed files with 92 additions and 12 deletions

View File

@@ -21,10 +21,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
defer fs.deleteClient(clientName)
lastReadTime := time.Now()
if req.SinceNs > 0 {
lastReadTime = time.Unix(0, req.SinceNs)
}
lastReadTime := time.Unix(0, req.SinceNs)
eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
@@ -59,12 +56,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
_, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
return true
}, func(logEntry *filer_pb.LogEntry) error {
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
@@ -76,7 +68,18 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
return nil
})
}
if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
}
_, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
return true
}, eachLogEntryFn)
return err