skip control messages
This commit is contained in:
@@ -53,9 +53,11 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
|
|||||||
|
|
||||||
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
|
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
|
||||||
|
|
||||||
err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
|
if lastOffset > 0 {
|
||||||
|
err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("shut down follower for %v", initMessage)
|
glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,6 +93,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
|
|||||||
}
|
}
|
||||||
switch m := resp.Message.(type) {
|
switch m := resp.Message.(type) {
|
||||||
case *mq_pb.SubscribeMessageResponse_Data:
|
case *mq_pb.SubscribeMessageResponse_Data:
|
||||||
|
if m.Data.Ctrl != nil {
|
||||||
|
glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.IsClose)
|
||||||
|
continue
|
||||||
|
}
|
||||||
executors.Execute(func() {
|
executors.Execute(func() {
|
||||||
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
|
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
|
||||||
if processErr == nil {
|
if processErr == nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user