Merge branch 'master' into mq

This commit is contained in:
chrislu
2024-05-20 11:04:53 -07:00
54 changed files with 2301 additions and 1656 deletions

View File

@@ -35,7 +35,6 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
}
t := topic.FromPbTopic(request.Topic)
var readErr, assignErr error
resp, readErr = b.fca.ReadTopicConfFromFiler(t)

View File

@@ -69,7 +69,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response)
}
var receivedSequence, acknowledgedSequence int64
var receivedSequence, acknowledgedSequence int64
var isClosed bool
// start sending ack to publisher
@@ -85,7 +85,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
lastAckTime := time.Now()
for !isClosed {
receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){
if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) {
acknowledgedSequence = receivedSequence
response := &mq_pb.PublishMessageResponse{
AckSequence: acknowledgedSequence,
@@ -101,7 +101,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
}()
// process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())

View File

@@ -13,10 +13,11 @@ import (
)
type memBuffer struct {
buf []byte
startTime time.Time
stopTime time.Time
buf []byte
startTime time.Time
stopTime time.Time
}
func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
var req *mq_pb.PublishFollowMeRequest
req, err = stream.Recv()
@@ -84,7 +85,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
}
}
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
logBuffer.ShutdownLogBuffer()
@@ -97,7 +97,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
// flush the remaining messages
inMemoryBuffers.CloseInput()
for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {

View File

@@ -45,7 +45,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
b.accessLock.Lock()
defer b.accessLock.Unlock()
p := topic.FromPbPartition(partition)
if localPartition:=b.localTopicManager.GetLocalPartition(t, p); localPartition!=nil {
if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}