read from disk if not in memory
This commit is contained in:
@@ -52,14 +52,15 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
|
||||
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
|
||||
isConnected := true
|
||||
sleepIntervalCount := 0
|
||||
|
||||
var counter int64
|
||||
defer func() {
|
||||
isConnected = false
|
||||
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
|
||||
glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition)
|
||||
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
|
||||
}()
|
||||
|
||||
var startPosition log_buffer.MessagePosition
|
||||
var inMemoryOnly bool
|
||||
if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil {
|
||||
offset := req.GetInit().GetPartitionOffset()
|
||||
if offset.StartTsNs != 0 {
|
||||
@@ -69,19 +70,10 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
|
||||
startPosition = log_buffer.NewMessagePosition(1, -2)
|
||||
} else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
|
||||
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2)
|
||||
} else if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY {
|
||||
inMemoryOnly = true
|
||||
for !localTopicPartition.HasData() {
|
||||
time.Sleep(337 * time.Millisecond)
|
||||
}
|
||||
memPosition := localTopicPartition.GetEarliestInMemoryMessagePosition()
|
||||
if startPosition.Before(memPosition.Time) {
|
||||
startPosition = memPosition
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
localTopicPartition.Subscribe(clientName, startPosition, inMemoryOnly, func() bool {
|
||||
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
|
||||
if !isConnected {
|
||||
return false
|
||||
}
|
||||
@@ -121,8 +113,8 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
|
||||
glog.Errorf("Error sending setup response: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
counter++
|
||||
return nil
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user