wait 3 seconds before shutting down publish client, to wait for all messages to be received
This commit is contained in:
@@ -7,10 +7,10 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PUB
|
// PUB
|
||||||
@@ -75,14 +75,17 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
|||||||
respChan := make(chan *mq_pb.PublishMessageResponse, 128)
|
respChan := make(chan *mq_pb.PublishMessageResponse, 128)
|
||||||
defer func() {
|
defer func() {
|
||||||
atomic.StoreInt32(&isStopping, 1)
|
atomic.StoreInt32(&isStopping, 1)
|
||||||
|
respChan <- &mq_pb.PublishMessageResponse{
|
||||||
|
AckSequence: ackSequence,
|
||||||
|
}
|
||||||
close(respChan)
|
close(respChan)
|
||||||
localTopicPartition.Publishers.RemovePublisher(clientName)
|
localTopicPartition.Publishers.RemovePublisher(clientName)
|
||||||
if localTopicPartition.MaybeShutdownLocalPartition() {
|
if localTopicPartition.MaybeShutdownLocalPartition() {
|
||||||
b.localTopicManager.RemoveTopicPartition(t, p)
|
b.localTopicManager.RemoveTopicPartition(t, p)
|
||||||
}
|
}
|
||||||
|
glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence)
|
||||||
}()
|
}()
|
||||||
go func() {
|
go func() {
|
||||||
ticker := time.NewTicker(1 * time.Second)
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case resp := <-respChan:
|
case resp := <-respChan:
|
||||||
@@ -93,15 +96,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
|||||||
} else {
|
} else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-ticker.C:
|
|
||||||
if atomic.LoadInt32(&isStopping) == 0 {
|
|
||||||
response := &mq_pb.PublishMessageResponse{
|
|
||||||
AckSequence: ackSequence,
|
|
||||||
}
|
|
||||||
respChan <- response
|
|
||||||
} else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-localTopicPartition.StopPublishersCh:
|
case <-localTopicPartition.StopPublishersCh:
|
||||||
respChan <- &mq_pb.PublishMessageResponse{
|
respChan <- &mq_pb.PublishMessageResponse{
|
||||||
AckSequence: ackSequence,
|
AckSequence: ackSequence,
|
||||||
@@ -116,6 +110,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
|||||||
// receive a message
|
// receive a message
|
||||||
req, err := stream.Recv()
|
req, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
_, err := publishClient.Recv()
|
ackResp, err := publishClient.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e, ok := status.FromError(err)
|
e, ok := status.FromError(err)
|
||||||
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
|
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
|
||||||
@@ -168,9 +168,18 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
|
|||||||
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
|
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if ackResp.Error != "" {
|
||||||
|
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
|
||||||
|
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, ackResp.Error)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if ackResp.AckSequence > 0 {
|
||||||
|
log.Printf("ack %d", ackResp.AckSequence)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
publishCounter := 0
|
||||||
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
|
for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
|
||||||
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
|
if err := publishClient.Send(&mq_pb.PublishMessageRequest{
|
||||||
Message: &mq_pb.PublishMessageRequest_Data{
|
Message: &mq_pb.PublishMessageRequest_Data{
|
||||||
@@ -179,7 +188,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
return fmt.Errorf("send publish data: %v", err)
|
return fmt.Errorf("send publish data: %v", err)
|
||||||
}
|
}
|
||||||
|
publishCounter++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := publishClient.CloseSend(); err != nil {
|
||||||
|
return fmt.Errorf("close send: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user