Accumulated changes for message queue (#6600)

* rename

* set agent address

* refactor

* add agent sub

* pub messages

* grpc new client

* can publish records via agent

* send init message with session id

* fmt

* check cancelled request while waiting

* use sessionId

* handle possible nil stream

* subscriber process messages

* separate debug port

* use atomic int64

* less logs

* minor

* skip io.EOF

* rename

* remove unused

* use saved offsets

* do not reuse session, since always session id is new after restart

remove last active ts from SessionEntry

* simplify printing

* purge unused

* just proxy the subscription, skipping the session step

* adjust offset types

* subscribe offset type and possible value

* start after the known tsns

* avoid wrongly set startPosition

* move

* remove

* refactor

* typo

* fix

* fix changed path
This commit is contained in:
Chris Lu
2025-03-09 23:49:42 -07:00
committed by GitHub
parent 14cb8a24c6
commit 02773a6107
43 changed files with 1086 additions and 1624 deletions

View File

@@ -68,7 +68,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
} else {
defer func() {
println("closing SubscribeFollowMe connection", follower)
subscribeFollowMeStream.CloseSend()
if subscribeFollowMeStream != nil {
subscribeFollowMeStream.CloseSend()
}
// followerGrpcConnection.Close()
}()
followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
@@ -142,7 +144,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
},
}); err != nil {
glog.Errorf("Error sending close to follower: %v", err)
if err != io.EOF {
glog.Errorf("Error sending close to follower: %v", err)
}
}
}
}()
@@ -178,6 +182,19 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
for imt.IsInflight(logEntry.Key) {
time.Sleep(137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return false, nil
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false, nil
default:
// Continue processing the request
}
}
if logEntry.Key != nil {
imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
@@ -204,20 +221,35 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
return
}
offset := initMessage.GetPartitionOffset()
if offset.StartTsNs != 0 {
offsetType := initMessage.OffsetType
// reset to earliest or latest
if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -3)
return
}
if offsetType == schema_pb.OffsetType_RESET_TO_LATEST {
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
return
}
// use the exact timestamp
if offsetType == schema_pb.OffsetType_EXACT_TS_NS {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
return
}
// try to resume
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}
if offset.StartType == schema_pb.PartitionOffsetStartType_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -3)
} else if offset.StartType == schema_pb.PartitionOffsetStartType_LATEST {
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
if offsetType == schema_pb.OffsetType_RESUME_OR_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -5)
} else if offsetType == schema_pb.OffsetType_RESUME_OR_LATEST {
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -6)
}
return
}