avoid looping forever if there are no more metadata updates
This commit is contained in:
@@ -49,11 +49,6 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
|||||||
|
|
||||||
if processedTsNs != 0 {
|
if processedTsNs != 0 {
|
||||||
lastReadTime = time.Unix(0, processedTsNs)
|
lastReadTime = time.Unix(0, processedTsNs)
|
||||||
} else {
|
|
||||||
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
|
|
||||||
time.Sleep(1127 * time.Millisecond)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||||
@@ -66,6 +61,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
|||||||
}, eachLogEntryFn)
|
}, eachLogEntryFn)
|
||||||
if readInMemoryLogErr != nil {
|
if readInMemoryLogErr != nil {
|
||||||
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
|
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
|
||||||
|
time.Sleep(1127 * time.Millisecond)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
|
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
|
||||||
|
|||||||
@@ -189,7 +189,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
|
|||||||
defer m.RUnlock()
|
defer m.RUnlock()
|
||||||
|
|
||||||
if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
|
if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
|
||||||
return nil, ResumeFromDiskError
|
if time.Now().Sub(m.lastFlushTime) < m.flushInterval * 2 {
|
||||||
|
diff := m.lastFlushTime.Sub(lastReadTime)
|
||||||
|
glog.V(4).Infof("lastFlush:%v lastRead:%v diff:%v", m.lastFlushTime, lastReadTime, diff)
|
||||||
|
return nil, ResumeFromDiskError
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|||||||
Reference in New Issue
Block a user