send is_close message to broker

This commit is contained in:
chrislu
2024-03-31 01:28:40 -07:00
parent c9df613b6b
commit 546ae87c39
5 changed files with 261 additions and 232 deletions

View File

@@ -36,6 +36,9 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
time.Sleep(time.Second)
// println("Published", string(key), string(value))
}
if err := publisher.FinishPublish(); err != nil {
fmt.Println(err)
}
elapsed := time.Since(startTime)
log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
}

View File

@@ -24,3 +24,16 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
TsNs: time.Now().UnixNano(),
})
}
func (p *TopicPublisher) FinishPublish() error {
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, inputBuffer := range inputBuffers {
inputBuffer.Enqueue(&mq_pb.DataMessage{
IsClose: true,
TsNs: time.Now().UnixNano(),
})
}
}
return nil
}

View File

@@ -195,6 +195,10 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
publishCounter := 0
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
if data.IsClose {
// need to set this before sending to brokers, to avoid timing issue
atomic.StoreInt32(&hasMoreData, 0)
}
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: data,
@@ -205,7 +209,6 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
publishCounter++
atomic.StoreInt64(&publishedTsNs, data.TsNs)
}
atomic.StoreInt32(&hasMoreData, 0)
if publishCounter > 0 {
wg.Wait()
} else {