Admin UI: Add message queue to admin UI (#6958)
* add a menu item "Message Queue" * add a menu item "Message Queue" * move the "brokers" link under it. * add "topics", "subscribers". Add pages for them. * refactor * show topic details * admin display publisher and subscriber info * remove publisher and subscribers from the topic row pull down * collecting more stats from publishers and subscribers * fix layout * fix publisher name * add local listeners for mq broker and agent * render consumer group offsets * remove subscribers from left menu * topic with retention * support editing topic retention * show retention when listing topics * create bucket * Update s3_buckets_templ.go * embed the static assets into the binary fix https://github.com/seaweedfs/seaweedfs/issues/6964
This commit is contained in:
@@ -62,6 +62,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
|
||||
}
|
||||
resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount)
|
||||
resp.RecordType = request.RecordType
|
||||
resp.Retention = request.Retention
|
||||
|
||||
// save the topic configuration on filer
|
||||
if err := b.fca.SaveTopicConfToFiler(t, resp); err != nil {
|
||||
|
||||
@@ -3,9 +3,13 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
||||
)
|
||||
@@ -50,27 +54,259 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
|
||||
}
|
||||
|
||||
ret := &mq_pb.ListTopicsResponse{}
|
||||
knownTopics := make(map[string]struct{})
|
||||
for brokerStatsItem := range b.PubBalancer.Brokers.IterBuffered() {
|
||||
_, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
|
||||
for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
|
||||
topicPartitionStat := topicPartitionStatsItem.Val
|
||||
topic := &schema_pb.Topic{
|
||||
Namespace: topicPartitionStat.TopicPartition.Namespace,
|
||||
Name: topicPartitionStat.TopicPartition.Name,
|
||||
|
||||
// Scan the filer directory structure to find all topics
|
||||
err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// List all namespaces under /topics
|
||||
stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
||||
Directory: filer.TopicsDir,
|
||||
Limit: 1000,
|
||||
})
|
||||
if err != nil {
|
||||
glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Process each namespace
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err.Error() == "EOF" {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name)
|
||||
if _, found := knownTopics[topicKey]; found {
|
||||
|
||||
if !resp.Entry.IsDirectory {
|
||||
continue
|
||||
}
|
||||
knownTopics[topicKey] = struct{}{}
|
||||
ret.Topics = append(ret.Topics, topic)
|
||||
|
||||
namespaceName := resp.Entry.Name
|
||||
namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName)
|
||||
|
||||
// List all topics in this namespace
|
||||
topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
||||
Directory: namespacePath,
|
||||
Limit: 1000,
|
||||
})
|
||||
if err != nil {
|
||||
glog.V(0).Infof("list topics in namespace %s: %v", namespacePath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Process each topic in the namespace
|
||||
for {
|
||||
topicResp, err := topicStream.Recv()
|
||||
if err != nil {
|
||||
if err.Error() == "EOF" {
|
||||
break
|
||||
}
|
||||
glog.V(0).Infof("error reading topic stream in namespace %s: %v", namespaceName, err)
|
||||
break
|
||||
}
|
||||
|
||||
if !topicResp.Entry.IsDirectory {
|
||||
continue
|
||||
}
|
||||
|
||||
topicName := topicResp.Entry.Name
|
||||
|
||||
// Check if topic.conf exists
|
||||
topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName)
|
||||
confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: topicPath,
|
||||
Name: filer.TopicConfFile,
|
||||
})
|
||||
if err != nil {
|
||||
glog.V(0).Infof("lookup topic.conf in %s: %v", topicPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if confResp.Entry != nil {
|
||||
// This is a valid topic
|
||||
topic := &schema_pb.Topic{
|
||||
Namespace: namespaceName,
|
||||
Name: topicName,
|
||||
}
|
||||
ret.Topics = append(ret.Topics, topic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.V(0).Infof("list topics from filer: %v", err)
|
||||
// Return empty response on error
|
||||
return &mq_pb.ListTopicsResponse{}, nil
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments
|
||||
func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) {
|
||||
if !b.isLockOwner() {
|
||||
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.GetTopicConfiguration(ctx, request)
|
||||
return nil
|
||||
})
|
||||
if proxyErr != nil {
|
||||
return nil, proxyErr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
t := topic.FromPbTopic(request.Topic)
|
||||
var conf *mq_pb.ConfigureTopicResponse
|
||||
var createdAtNs, modifiedAtNs int64
|
||||
|
||||
if conf, createdAtNs, modifiedAtNs, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
|
||||
glog.V(0).Infof("get topic configuration %s: %v", request.Topic, err)
|
||||
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
|
||||
}
|
||||
|
||||
// Ensure topic assignments are active
|
||||
err = b.ensureTopicActiveAssignments(t, conf)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("ensure topic active assignments %s: %v", request.Topic, err)
|
||||
return nil, fmt.Errorf("failed to ensure topic assignments: %v", err)
|
||||
}
|
||||
|
||||
// Build the response with complete configuration including metadata
|
||||
ret := &mq_pb.GetTopicConfigurationResponse{
|
||||
Topic: request.Topic,
|
||||
PartitionCount: int32(len(conf.BrokerPartitionAssignments)),
|
||||
RecordType: conf.RecordType,
|
||||
BrokerPartitionAssignments: conf.BrokerPartitionAssignments,
|
||||
CreatedAtNs: createdAtNs,
|
||||
LastUpdatedNs: modifiedAtNs,
|
||||
Retention: conf.Retention,
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// GetTopicPublishers returns the active publishers for a topic
|
||||
func (b *MessageQueueBroker) GetTopicPublishers(ctx context.Context, request *mq_pb.GetTopicPublishersRequest) (resp *mq_pb.GetTopicPublishersResponse, err error) {
|
||||
if !b.isLockOwner() {
|
||||
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.GetTopicPublishers(ctx, request)
|
||||
return nil
|
||||
})
|
||||
if proxyErr != nil {
|
||||
return nil, proxyErr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
t := topic.FromPbTopic(request.Topic)
|
||||
var publishers []*mq_pb.TopicPublisher
|
||||
|
||||
// Get topic configuration to find partition assignments
|
||||
var conf *mq_pb.ConfigureTopicResponse
|
||||
if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
|
||||
glog.V(0).Infof("get topic configuration for publishers %s: %v", request.Topic, err)
|
||||
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
|
||||
}
|
||||
|
||||
// Collect publishers from each partition that is hosted on this broker
|
||||
for _, assignment := range conf.BrokerPartitionAssignments {
|
||||
// Only collect from partitions where this broker is the leader
|
||||
if assignment.LeaderBroker == b.option.BrokerAddress().String() {
|
||||
partition := topic.FromPbPartition(assignment.Partition)
|
||||
if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
|
||||
// Get publisher information from local partition
|
||||
localPartition.Publishers.ForEachPublisher(func(clientName string, publisher *topic.LocalPublisher) {
|
||||
connectTimeNs, lastSeenTimeNs := publisher.GetTimestamps()
|
||||
lastPublishedOffset, lastAckedOffset := publisher.GetOffsets()
|
||||
publishers = append(publishers, &mq_pb.TopicPublisher{
|
||||
PublisherName: clientName,
|
||||
ClientId: clientName, // For now, client name is used as client ID
|
||||
Partition: assignment.Partition,
|
||||
ConnectTimeNs: connectTimeNs,
|
||||
LastSeenTimeNs: lastSeenTimeNs,
|
||||
Broker: assignment.LeaderBroker,
|
||||
IsActive: true,
|
||||
LastPublishedOffset: lastPublishedOffset,
|
||||
LastAckedOffset: lastAckedOffset,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &mq_pb.GetTopicPublishersResponse{
|
||||
Publishers: publishers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetTopicSubscribers returns the active subscribers for a topic
|
||||
func (b *MessageQueueBroker) GetTopicSubscribers(ctx context.Context, request *mq_pb.GetTopicSubscribersRequest) (resp *mq_pb.GetTopicSubscribersResponse, err error) {
|
||||
if !b.isLockOwner() {
|
||||
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.GetTopicSubscribers(ctx, request)
|
||||
return nil
|
||||
})
|
||||
if proxyErr != nil {
|
||||
return nil, proxyErr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
t := topic.FromPbTopic(request.Topic)
|
||||
var subscribers []*mq_pb.TopicSubscriber
|
||||
|
||||
// Get topic configuration to find partition assignments
|
||||
var conf *mq_pb.ConfigureTopicResponse
|
||||
if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
|
||||
glog.V(0).Infof("get topic configuration for subscribers %s: %v", request.Topic, err)
|
||||
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
|
||||
}
|
||||
|
||||
// Collect subscribers from each partition that is hosted on this broker
|
||||
for _, assignment := range conf.BrokerPartitionAssignments {
|
||||
// Only collect from partitions where this broker is the leader
|
||||
if assignment.LeaderBroker == b.option.BrokerAddress().String() {
|
||||
partition := topic.FromPbPartition(assignment.Partition)
|
||||
if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
|
||||
// Get subscriber information from local partition
|
||||
localPartition.Subscribers.ForEachSubscriber(func(clientName string, subscriber *topic.LocalSubscriber) {
|
||||
// Parse client name to extract consumer group and consumer ID
|
||||
// Format is typically: "consumerGroup/consumerID"
|
||||
consumerGroup := "default"
|
||||
consumerID := clientName
|
||||
if idx := strings.Index(clientName, "/"); idx != -1 {
|
||||
consumerGroup = clientName[:idx]
|
||||
consumerID = clientName[idx+1:]
|
||||
}
|
||||
|
||||
connectTimeNs, lastSeenTimeNs := subscriber.GetTimestamps()
|
||||
lastReceivedOffset, lastAckedOffset := subscriber.GetOffsets()
|
||||
|
||||
subscribers = append(subscribers, &mq_pb.TopicSubscriber{
|
||||
ConsumerGroup: consumerGroup,
|
||||
ConsumerId: consumerID,
|
||||
ClientId: clientName, // Full client name as client ID
|
||||
Partition: assignment.Partition,
|
||||
ConnectTimeNs: connectTimeNs,
|
||||
LastSeenTimeNs: lastSeenTimeNs,
|
||||
Broker: assignment.LeaderBroker,
|
||||
IsActive: true,
|
||||
CurrentOffset: lastAckedOffset, // for compatibility
|
||||
LastReceivedOffset: lastReceivedOffset,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &mq_pb.GetTopicSubscribersResponse{
|
||||
Subscribers: subscribers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *MessageQueueBroker) isLockOwner() bool {
|
||||
return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
|
||||
}
|
||||
|
||||
@@ -3,15 +3,16 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/peer"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
// PUB
|
||||
@@ -69,6 +70,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
||||
var receivedSequence, acknowledgedSequence int64
|
||||
var isClosed bool
|
||||
|
||||
// process each published messages
|
||||
clientName := fmt.Sprintf("%v-%4d", findClientAddress(stream.Context()), rand.Intn(10000))
|
||||
publisher := topic.NewLocalPublisher()
|
||||
localTopicPartition.Publishers.AddPublisher(clientName, publisher)
|
||||
|
||||
// start sending ack to publisher
|
||||
ackInterval := int64(1)
|
||||
if initMessage.AckInterval > 0 {
|
||||
@@ -90,6 +96,8 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
||||
if err := stream.Send(response); err != nil {
|
||||
glog.Errorf("Error sending response %v: %v", response, err)
|
||||
}
|
||||
// Update acknowledged offset for this publisher
|
||||
publisher.UpdateAckedOffset(acknowledgedSequence)
|
||||
// println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
|
||||
lastAckTime = time.Now()
|
||||
} else {
|
||||
@@ -98,10 +106,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
||||
}
|
||||
}()
|
||||
|
||||
// process each published messages
|
||||
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
|
||||
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
|
||||
|
||||
defer func() {
|
||||
// remove the publisher
|
||||
localTopicPartition.Publishers.RemovePublisher(clientName)
|
||||
@@ -143,6 +147,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
|
||||
if err = localTopicPartition.Publish(dataMessage); err != nil {
|
||||
return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
|
||||
}
|
||||
|
||||
// Update published offset and last seen time for this publisher
|
||||
publisher.UpdatePublishedOffset(dataMessage.TsNs)
|
||||
}
|
||||
|
||||
glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
|
||||
|
||||
@@ -4,6 +4,9 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
@@ -12,8 +15,6 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
|
||||
@@ -40,7 +41,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
return getOrGenErr
|
||||
}
|
||||
|
||||
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
|
||||
subscriber := topic.NewLocalSubscriber()
|
||||
localTopicPartition.Subscribers.AddSubscriber(clientName, subscriber)
|
||||
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
|
||||
isConnected := true
|
||||
sleepIntervalCount := 0
|
||||
@@ -115,7 +117,10 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
continue
|
||||
}
|
||||
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
|
||||
|
||||
currentLastOffset := imt.GetOldestAckedTimestamp()
|
||||
// Update acknowledged offset and last seen time for this subscriber when it sends an ack
|
||||
subscriber.UpdateAckedOffset(currentLastOffset)
|
||||
// fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
|
||||
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
|
||||
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
|
||||
@@ -211,6 +216,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Update received offset and last seen time for this subscriber
|
||||
subscriber.UpdateReceivedOffset(logEntry.TsNs)
|
||||
|
||||
counter++
|
||||
return false, nil
|
||||
})
|
||||
|
||||
@@ -1,20 +1,61 @@
|
||||
package topic
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LocalPartitionPublishers struct {
|
||||
publishers map[string]*LocalPublisher
|
||||
publishersLock sync.RWMutex
|
||||
}
|
||||
type LocalPublisher struct {
|
||||
connectTimeNs int64 // accessed atomically
|
||||
lastSeenTimeNs int64 // accessed atomically
|
||||
lastPublishedOffset int64 // accessed atomically - offset of last message published
|
||||
lastAckedOffset int64 // accessed atomically - offset of last message acknowledged by broker
|
||||
}
|
||||
|
||||
func NewLocalPublisher() *LocalPublisher {
|
||||
return &LocalPublisher{}
|
||||
now := time.Now().UnixNano()
|
||||
publisher := &LocalPublisher{}
|
||||
atomic.StoreInt64(&publisher.connectTimeNs, now)
|
||||
atomic.StoreInt64(&publisher.lastSeenTimeNs, now)
|
||||
atomic.StoreInt64(&publisher.lastPublishedOffset, 0)
|
||||
atomic.StoreInt64(&publisher.lastAckedOffset, 0)
|
||||
return publisher
|
||||
}
|
||||
func (p *LocalPublisher) SignalShutdown() {
|
||||
}
|
||||
|
||||
// UpdateLastSeen updates the last activity time for this publisher
|
||||
func (p *LocalPublisher) UpdateLastSeen() {
|
||||
atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// UpdatePublishedOffset updates the offset of the last message published by this publisher
|
||||
func (p *LocalPublisher) UpdatePublishedOffset(offset int64) {
|
||||
atomic.StoreInt64(&p.lastPublishedOffset, offset)
|
||||
atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// UpdateAckedOffset updates the offset of the last message acknowledged by the broker for this publisher
|
||||
func (p *LocalPublisher) UpdateAckedOffset(offset int64) {
|
||||
atomic.StoreInt64(&p.lastAckedOffset, offset)
|
||||
atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// GetTimestamps returns the connect and last seen timestamps safely
|
||||
func (p *LocalPublisher) GetTimestamps() (connectTimeNs, lastSeenTimeNs int64) {
|
||||
return atomic.LoadInt64(&p.connectTimeNs), atomic.LoadInt64(&p.lastSeenTimeNs)
|
||||
}
|
||||
|
||||
// GetOffsets returns the published and acknowledged offsets safely
|
||||
func (p *LocalPublisher) GetOffsets() (lastPublishedOffset, lastAckedOffset int64) {
|
||||
return atomic.LoadInt64(&p.lastPublishedOffset), atomic.LoadInt64(&p.lastAckedOffset)
|
||||
}
|
||||
|
||||
func NewLocalPartitionPublishers() *LocalPartitionPublishers {
|
||||
return &LocalPartitionPublishers{
|
||||
publishers: make(map[string]*LocalPublisher),
|
||||
@@ -50,3 +91,25 @@ func (p *LocalPartitionPublishers) Size() int {
|
||||
|
||||
return len(p.publishers)
|
||||
}
|
||||
|
||||
// GetPublisherNames returns the names of all publishers
|
||||
func (p *LocalPartitionPublishers) GetPublisherNames() []string {
|
||||
p.publishersLock.RLock()
|
||||
defer p.publishersLock.RUnlock()
|
||||
|
||||
names := make([]string, 0, len(p.publishers))
|
||||
for name := range p.publishers {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// ForEachPublisher iterates over all publishers
|
||||
func (p *LocalPartitionPublishers) ForEachPublisher(fn func(name string, publisher *LocalPublisher)) {
|
||||
p.publishersLock.RLock()
|
||||
defer p.publishersLock.RUnlock()
|
||||
|
||||
for name, publisher := range p.publishers {
|
||||
fn(name, publisher)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,24 +1,70 @@
|
||||
package topic
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LocalPartitionSubscribers struct {
|
||||
Subscribers map[string]*LocalSubscriber
|
||||
SubscribersLock sync.RWMutex
|
||||
}
|
||||
type LocalSubscriber struct {
|
||||
stopCh chan struct{}
|
||||
connectTimeNs int64 // accessed atomically
|
||||
lastSeenTimeNs int64 // accessed atomically
|
||||
lastReceivedOffset int64 // accessed atomically - offset of last message received
|
||||
lastAckedOffset int64 // accessed atomically - offset of last message acknowledged
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func NewLocalSubscriber() *LocalSubscriber {
|
||||
return &LocalSubscriber{
|
||||
now := time.Now().UnixNano()
|
||||
subscriber := &LocalSubscriber{
|
||||
stopCh: make(chan struct{}, 1),
|
||||
}
|
||||
atomic.StoreInt64(&subscriber.connectTimeNs, now)
|
||||
atomic.StoreInt64(&subscriber.lastSeenTimeNs, now)
|
||||
atomic.StoreInt64(&subscriber.lastReceivedOffset, 0)
|
||||
atomic.StoreInt64(&subscriber.lastAckedOffset, 0)
|
||||
return subscriber
|
||||
}
|
||||
func (p *LocalSubscriber) SignalShutdown() {
|
||||
close(p.stopCh)
|
||||
}
|
||||
|
||||
// UpdateLastSeen updates the last activity time for this subscriber
|
||||
func (p *LocalSubscriber) UpdateLastSeen() {
|
||||
atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// UpdateReceivedOffset updates the offset of the last message received by this subscriber
|
||||
func (p *LocalSubscriber) UpdateReceivedOffset(offset int64) {
|
||||
atomic.StoreInt64(&p.lastReceivedOffset, offset)
|
||||
atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// UpdateAckedOffset updates the offset of the last message acknowledged by this subscriber
|
||||
func (p *LocalSubscriber) UpdateAckedOffset(offset int64) {
|
||||
atomic.StoreInt64(&p.lastAckedOffset, offset)
|
||||
atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// GetTimestamps returns the connect and last seen timestamps safely
|
||||
func (p *LocalSubscriber) GetTimestamps() (connectTimeNs, lastSeenTimeNs int64) {
|
||||
return atomic.LoadInt64(&p.connectTimeNs), atomic.LoadInt64(&p.lastSeenTimeNs)
|
||||
}
|
||||
|
||||
// GetOffsets returns the received and acknowledged offsets safely
|
||||
func (p *LocalSubscriber) GetOffsets() (lastReceivedOffset, lastAckedOffset int64) {
|
||||
return atomic.LoadInt64(&p.lastReceivedOffset), atomic.LoadInt64(&p.lastAckedOffset)
|
||||
}
|
||||
|
||||
// GetCurrentOffset returns the acknowledged offset (for compatibility)
|
||||
func (p *LocalSubscriber) GetCurrentOffset() int64 {
|
||||
return atomic.LoadInt64(&p.lastAckedOffset)
|
||||
}
|
||||
|
||||
func NewLocalPartitionSubscribers() *LocalPartitionSubscribers {
|
||||
return &LocalPartitionSubscribers{
|
||||
Subscribers: make(map[string]*LocalSubscriber),
|
||||
@@ -54,3 +100,25 @@ func (p *LocalPartitionSubscribers) Size() int {
|
||||
|
||||
return len(p.Subscribers)
|
||||
}
|
||||
|
||||
// GetSubscriberNames returns the names of all subscribers
|
||||
func (p *LocalPartitionSubscribers) GetSubscriberNames() []string {
|
||||
p.SubscribersLock.RLock()
|
||||
defer p.SubscribersLock.RUnlock()
|
||||
|
||||
names := make([]string, 0, len(p.Subscribers))
|
||||
for name := range p.Subscribers {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// ForEachSubscriber iterates over all subscribers
|
||||
func (p *LocalPartitionSubscribers) ForEachSubscriber(fn func(name string, subscriber *LocalSubscriber)) {
|
||||
p.SubscribersLock.RLock()
|
||||
defer p.SubscribersLock.RUnlock()
|
||||
|
||||
for name, subscriber := range p.Subscribers {
|
||||
fn(name, subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,10 @@ package topic
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
@@ -60,6 +62,38 @@ func (t Topic) ReadConfFile(client filer_pb.SeaweedFilerClient) (*mq_pb.Configur
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
// ReadConfFileWithMetadata reads the topic configuration and returns it along with file metadata
|
||||
func (t Topic) ReadConfFileWithMetadata(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, int64, int64, error) {
|
||||
// Use LookupDirectoryEntry to get both content and metadata
|
||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: t.Dir(),
|
||||
Name: filer.TopicConfFile,
|
||||
}
|
||||
|
||||
resp, err := filer_pb.LookupEntry(context.Background(), client, request)
|
||||
if err != nil {
|
||||
if errors.Is(err, filer_pb.ErrNotFound) {
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
return nil, 0, 0, fmt.Errorf("lookup topic.conf of %v: %v", t, err)
|
||||
}
|
||||
|
||||
// Get file metadata
|
||||
var createdAtNs, modifiedAtNs int64
|
||||
if resp.Entry.Attributes != nil {
|
||||
createdAtNs = resp.Entry.Attributes.Crtime * 1e9 // convert seconds to nanoseconds
|
||||
modifiedAtNs = resp.Entry.Attributes.Mtime * 1e9 // convert seconds to nanoseconds
|
||||
}
|
||||
|
||||
// Parse the configuration
|
||||
conf := &mq_pb.ConfigureTopicResponse{}
|
||||
if err = jsonpb.Unmarshal(resp.Entry.Content, conf); err != nil {
|
||||
return nil, 0, 0, fmt.Errorf("unmarshal topic %v conf: %v", t, err)
|
||||
}
|
||||
|
||||
return conf, createdAtNs, modifiedAtNs, nil
|
||||
}
|
||||
|
||||
func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.ConfigureTopicResponse) error {
|
||||
var buf bytes.Buffer
|
||||
filer.ProtoToText(&buf, conf)
|
||||
|
||||
Reference in New Issue
Block a user