subscribe metadata between a range

This commit is contained in:
chrislu
2022-05-30 15:04:19 -07:00
parent 5b9347c938
commit a2b101a737
16 changed files with 273 additions and 242 deletions

View File

@@ -164,7 +164,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
defer chunkedFileReader.Close()
if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
return err