add SubscribeLocalMetadata without checking persisted meta logs

This commit is contained in:
Chris Lu
2020-07-05 15:50:07 -07:00
parent 55e40b08fc
commit 70d8a3a1d3
4 changed files with 141 additions and 38 deletions

View File

@@ -48,6 +48,32 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error {
peerAddress := findClientAddress(stream.Context(), 0)
clientName := fs.addClient(req.ClientName, peerAddress)
defer fs.deleteClient(clientName)
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
return true
}, eachLogEntryFn)
return err
}
func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error {
return func(logEntry *filer_pb.LogEntry) error {
event := &filer_pb.SubscribeMetadataResponse{}