persist consumer group offset

1. use one follower
2. read write consumer group offset
This commit is contained in:
chrislu
2024-05-19 00:46:12 -07:00
parent 8d5bb7420d
commit b1871427c3
18 changed files with 609 additions and 538 deletions

View File

@@ -36,7 +36,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
// if is leader, notify the followers to drain existing topic partition subscriptions
if request.IsLeader {
for _, brokerPartition := range request.BrokerPartitionAssignments {
for _, follower := range brokerPartition.FollowerBrokers {
if follower := brokerPartition.FollowerBroker; follower != "" {
err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.AssignTopicPartitions(context.Background(), request)
return err

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
@@ -51,12 +52,42 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}()
var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
startPosition := b.getRequestPosition(req.GetInit())
// connect to the follower
var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
if req.GetInit().FollowerBroker != "" {
follower := req.GetInit().FollowerBroker
if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
return fmt.Errorf("fail to dial %s: %v", follower, err)
} else {
defer func() {
println("closing SubscribeFollowMe connection", follower)
followerGrpcConnection.Close()
}()
followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
return fmt.Errorf("fail to subscribe to %s: %v", follower, err)
} else {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Init{
Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
Topic: req.GetInit().Topic,
Partition: req.GetInit().GetPartitionOffset().Partition,
ConsumerGroup: req.GetInit().ConsumerGroup,
},
},
}); err != nil {
return fmt.Errorf("fail to send init to %s: %v", follower, err)
}
}
}
glog.V(0).Infof("follower %s connected", follower)
}
go func() {
var lastOffset int64
for {
ack, err := stream.Recv()
if err != nil {
@@ -66,7 +97,34 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err)
break
}
println(clientName, "ack =>", ack.GetAck().Sequence)
lastOffset = ack.GetAck().Sequence
if subscribeFollowMeStream != nil {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Ack{
Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
TsNs: lastOffset,
},
},
}); err != nil {
glog.Errorf("Error sending ack to follower: %v", err)
break
}
println("forwarding ack", lastOffset)
}
}
if lastOffset > 0 {
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
glog.Errorf("saveConsumerGroupOffset: %v", err)
}
if subscribeFollowMeStream != nil {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Close{
Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
},
}); err != nil {
glog.Errorf("Error sending close to follower: %v", err)
}
}
}
}()
@@ -115,10 +173,20 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
})
}
func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) {
if initMessage == nil {
return
}
offset := initMessage.GetPartitionOffset()
if offset.StartTsNs != 0 {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
return
}
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil{
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}
if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -3)
} else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {

View File

@@ -25,11 +25,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
}
// create an in-memory offset
subscriberProgress := &SubscriberProgress{
Topic: topic.FromPbTopic(initMessage.Topic),
Partition: topic.FromPbPartition(initMessage.Partition),
ConsumerGroup: "consumer_group",
}
var lastOffset int64
// follow each published messages
for {
@@ -46,8 +42,8 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
// Process the received message
if ackMessage := req.GetAck(); ackMessage != nil {
subscriberProgress.Offset = ackMessage.TsNs
println("offset", subscriberProgress.Offset)
lastOffset = ackMessage.TsNs
println("offset", lastOffset)
} else if closeMessage := req.GetClose(); closeMessage != nil {
glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
return nil
@@ -58,19 +54,47 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
offsetFileName := fmt.Sprintf("%s.offset", subscriberProgress.ConsumerGroup)
err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
offsetBytes := make([]byte, 8)
util.Uint64toBytes(offsetBytes, uint64(subscriberProgress.Offset))
err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
})
glog.V(0).Infof("shut down follower for %v %v", t, p)
glog.V(0).Infof("shut down follower for %v", initMessage)
return err
}
func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition)
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup)
err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
if err != nil {
return err
}
if len(data) != 8 {
return fmt.Errorf("no offset found")
}
offset = int64(util.BytesToUint64(data))
return nil
})
return offset, err
}
func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error {
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
offsetBytes := make([]byte, 8)
util.Uint64toBytes(offsetBytes, uint64(offset))
return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
})
}

View File

@@ -103,7 +103,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
if len(entry.Content) > 0 {
glog.Warningf("this should not happen. unexpected content in %s/%s", partitionDir, entry.Name)
// skip .offset files
return
}
var urlStrings []string

View File

@@ -1,10 +0,0 @@
package broker
import "github.com/seaweedfs/seaweedfs/weed/mq/topic"
type SubscriberProgress struct {
topic.Topic
topic.Partition
ConsumerGroup string
Offset int64
}