subscribe with partition offset

This commit is contained in:
chrislu
2024-01-05 15:24:14 -08:00
parent 531f854af2
commit e8611ed85d
3 changed files with 142 additions and 198 deletions

View File

@@ -13,7 +13,7 @@ import (
func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().Partition)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeMessageResponse{
@@ -39,7 +39,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
ctx := stream.Context()
startTime := time.Now()
if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 {
if startTs := req.GetInit().GetPartitionOffset().GetTsNs(); startTs > 0 {
startTime = time.Unix(0, startTs)
}