multiple subscriber with same subscriberId shares the topic manager

rename topicControl to topicCursor
This commit is contained in:
Chris Lu
2020-05-12 21:26:02 -07:00
parent 2f243f5b0b
commit a7959c1c48
4 changed files with 95 additions and 13 deletions

View File

@@ -25,28 +25,29 @@ func (tp *TopicPartition) String() string {
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
}
type TopicControl struct {
type TopicCursor struct {
sync.Mutex
cond *sync.Cond
subscriberCount int
publisherCount int
logBuffer *log_buffer.LogBuffer
subscriptions *TopicPartitionSubscriptions
}
type TopicManager struct {
sync.Mutex
topicControls map[TopicPartition]*TopicControl
topicControls map[TopicPartition]*TopicCursor
broker *MessageBroker
}
func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
return &TopicManager{
topicControls: make(map[TopicPartition]*TopicControl),
topicControls: make(map[TopicPartition]*TopicCursor),
broker: messageBroker,
}
}
func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
flushFn := func(startTime, stopTime time.Time, buf []byte) {
@@ -68,21 +69,21 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topi
}
}
logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
tl.cond.Broadcast()
tl.subscriptions.NotifyAll()
})
return logBuffer
}
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicCursor {
tm.Lock()
defer tm.Unlock()
lock, found := tm.topicControls[partition]
if !found {
lock = &TopicControl{}
lock.cond = sync.NewCond(&lock.Mutex)
lock = &TopicCursor{}
tm.topicControls[partition] = lock
lock.subscriptions = NewTopicPartitionSubscriptions()
lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig)
}
if isPublisher {