add batch index for each memory buffer
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -41,7 +40,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
|
||||
}
|
||||
if receivedStats := req.GetStats(); receivedStats != nil {
|
||||
b.Balancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
|
||||
glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
|
||||
// glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -38,12 +39,30 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
|
||||
}()
|
||||
|
||||
ctx := stream.Context()
|
||||
startTime := time.Now()
|
||||
if startTs := req.GetInit().GetPartitionOffset().GetTsNs(); startTs > 0 {
|
||||
startTime = time.Unix(0, startTs)
|
||||
var startPosition log_buffer.MessagePosition
|
||||
var inMemoryOnly bool
|
||||
if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil {
|
||||
offset := req.GetInit().GetPartitionOffset()
|
||||
if offset.StartTsNs != 0 {
|
||||
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
|
||||
}
|
||||
if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
|
||||
startPosition = log_buffer.NewMessagePosition(1, -2)
|
||||
} else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
|
||||
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2)
|
||||
} else if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY {
|
||||
inMemoryOnly = true
|
||||
for !localTopicPartition.HasData() {
|
||||
time.Sleep(337 * time.Millisecond)
|
||||
}
|
||||
memPosition := localTopicPartition.GetEarliestInMemoryMessagePosition()
|
||||
if startPosition.Before(memPosition.Time) {
|
||||
startPosition = memPosition
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
localTopicPartition.Subscribe(clientName, startTime, func() bool {
|
||||
localTopicPartition.Subscribe(clientName, startPosition, inMemoryOnly, func() bool {
|
||||
if !isConnected {
|
||||
return false
|
||||
}
|
||||
@@ -51,7 +70,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
|
||||
if sleepIntervalCount > 10 {
|
||||
sleepIntervalCount = 10
|
||||
}
|
||||
time.Sleep(time.Duration(sleepIntervalCount) * 2339 * time.Millisecond)
|
||||
time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond)
|
||||
|
||||
// Check if the client has disconnected by monitoring the context
|
||||
select {
|
||||
|
||||
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
@@ -40,13 +41,15 @@ func main() {
|
||||
brokers := strings.Split(*seedBrokers, ",")
|
||||
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
|
||||
|
||||
counter := 0
|
||||
subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) {
|
||||
println(string(key), "=>", string(value))
|
||||
counter++
|
||||
println(string(key), "=>", string(value), counter)
|
||||
return true, nil
|
||||
})
|
||||
|
||||
subscriber.SetCompletionFunc(func() {
|
||||
println("done subscribing")
|
||||
glog.V(0).Infof("done recived %d messages", counter)
|
||||
})
|
||||
|
||||
if err := subscriber.Subscribe(); err != nil {
|
||||
|
||||
@@ -114,7 +114,8 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
|
||||
RangeStart: partition.RangeStart,
|
||||
RangeStop: partition.RangeStop,
|
||||
},
|
||||
TsNs: sub.alreadyProcessedTsNs,
|
||||
StartTsNs: sub.alreadyProcessedTsNs,
|
||||
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
|
||||
},
|
||||
Filter: sub.ContentConfig.Filter,
|
||||
},
|
||||
|
||||
@@ -47,8 +47,20 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {
|
||||
p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (p *LocalPartition) Subscribe(clientName string, startReadTime time.Time, onNoMessageFn func() bool, eachMessageFn OnEachMessageFn) {
|
||||
p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, onNoMessageFn, eachMessageFn)
|
||||
func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition, inMemoryOnly bool, onNoMessageFn func() bool, eachMessageFn OnEachMessageFn) {
|
||||
p.logBuffer.LoopProcessLogData(clientName, startPosition, inMemoryOnly, 0, onNoMessageFn, eachMessageFn)
|
||||
}
|
||||
|
||||
func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time {
|
||||
return p.logBuffer.GetEarliestTime()
|
||||
}
|
||||
|
||||
func (p *LocalPartition) HasData() bool {
|
||||
return !p.logBuffer.GetEarliestTime().IsZero()
|
||||
}
|
||||
|
||||
func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition {
|
||||
return p.logBuffer.GetEarliestPosition()
|
||||
}
|
||||
|
||||
func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition {
|
||||
|
||||
Reference in New Issue
Block a user