Add message queue agent (#6463)
* scaffold message queue agent * adjust proto, add mq_agent * add agent client implementation * remove unused function * agent publish server implementation * adding agent
This commit is contained in:
61
weed/mq/agent/agent_grpc_pub_session.go
Normal file
61
weed/mq/agent/agent_grpc_pub_session.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
||||
"log/slog"
|
||||
"math/rand/v2"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) {
|
||||
sessionId := rand.Int64()
|
||||
|
||||
topicPublisher := pub_client.NewTopicPublisher(
|
||||
&pub_client.PublisherConfiguration{
|
||||
Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name),
|
||||
PartitionCount: req.PartitionCount,
|
||||
Brokers: a.brokersList(),
|
||||
PublisherName: req.PublisherName,
|
||||
RecordType: req.RecordType,
|
||||
})
|
||||
|
||||
a.publishersLock.Lock()
|
||||
// remove inactive publishers to avoid memory leak
|
||||
for k, entry := range a.publishers {
|
||||
if entry.lastActiveTsNs == 0 {
|
||||
// this is an active session
|
||||
continue
|
||||
}
|
||||
if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) {
|
||||
delete(a.publishers, k)
|
||||
}
|
||||
}
|
||||
a.publishers[SessionId(sessionId)] = &SessionEntry[*pub_client.TopicPublisher]{
|
||||
entry: topicPublisher,
|
||||
}
|
||||
a.publishersLock.Unlock()
|
||||
|
||||
return &mq_agent_pb.StartPublishSessionResponse{
|
||||
SessionId: sessionId,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *MessageQueueAgent) ClosePublishSession(ctx context.Context, req *mq_agent_pb.ClosePublishSessionRequest) (*mq_agent_pb.ClosePublishSessionResponse, error) {
|
||||
var finishErr string
|
||||
a.publishersLock.Lock()
|
||||
publisherEntry, found := a.publishers[SessionId(req.SessionId)]
|
||||
if found {
|
||||
if err := publisherEntry.entry.FinishPublish(); err != nil {
|
||||
finishErr = err.Error()
|
||||
slog.Warn("failed to finish publish", "error", err)
|
||||
}
|
||||
delete(a.publishers, SessionId(req.SessionId))
|
||||
}
|
||||
a.publishersLock.Unlock()
|
||||
return &mq_agent_pb.ClosePublishSessionResponse{
|
||||
Error: finishErr,
|
||||
}, nil
|
||||
}
|
||||
43
weed/mq/agent/agent_grpc_publish.go
Normal file
43
weed/mq/agent/agent_grpc_publish.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (a *MessageQueueAgent) PublishRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordServer) error {
|
||||
m, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.publishersLock.RLock()
|
||||
publisherEntry, found := a.publishers[SessionId(m.SessionId)]
|
||||
a.publishersLock.RUnlock()
|
||||
if !found {
|
||||
return fmt.Errorf("publish session id %d not found", m.SessionId)
|
||||
}
|
||||
defer func() {
|
||||
publisherEntry.lastActiveTsNs = time.Now().UnixNano()
|
||||
}()
|
||||
publisherEntry.lastActiveTsNs = 0
|
||||
|
||||
if m.Value != nil {
|
||||
if err := publisherEntry.entry.PublishRecord(m.Key, m.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
m, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if m.Value == nil {
|
||||
continue
|
||||
}
|
||||
if err := publisherEntry.entry.PublishRecord(m.Key, m.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
57
weed/mq/agent/agent_grpc_sub_session.go
Normal file
57
weed/mq/agent/agent_grpc_sub_session.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"math/rand/v2"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (a *MessageQueueAgent) StartSubscribeSession(ctx context.Context, req *mq_agent_pb.StartSubscribeSessionRequest) (*mq_agent_pb.StartSubscribeSessionResponse, error) {
|
||||
sessionId := rand.Int64()
|
||||
|
||||
subscriberConfig := &sub_client.SubscriberConfiguration{
|
||||
ConsumerGroup: req.ConsumerGroup,
|
||||
ConsumerGroupInstanceId: req.ConsumerGroupInstanceId,
|
||||
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
MaxPartitionCount: req.MaxSubscribedPartitions,
|
||||
SlidingWindowSize: req.SlidingWindowSize,
|
||||
}
|
||||
|
||||
contentConfig := &sub_client.ContentConfiguration{
|
||||
Topic: topic.FromPbTopic(req.Topic),
|
||||
Filter: req.Filter,
|
||||
PartitionOffsets: req.PartitionOffsets,
|
||||
}
|
||||
|
||||
topicSubscriber := sub_client.NewTopicSubscriber(
|
||||
a.brokersList(),
|
||||
subscriberConfig,
|
||||
contentConfig,
|
||||
make(chan sub_client.KeyedOffset, 1024),
|
||||
)
|
||||
|
||||
a.subscribersLock.Lock()
|
||||
// remove inactive publishers to avoid memory leak
|
||||
for k, entry := range a.subscribers {
|
||||
if entry.lastActiveTsNs == 0 {
|
||||
// this is an active session
|
||||
continue
|
||||
}
|
||||
if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) {
|
||||
delete(a.subscribers, k)
|
||||
}
|
||||
}
|
||||
a.subscribers[SessionId(sessionId)] = &SessionEntry[*sub_client.TopicSubscriber]{
|
||||
entry: topicSubscriber,
|
||||
}
|
||||
a.subscribersLock.Unlock()
|
||||
|
||||
return &mq_agent_pb.StartSubscribeSessionResponse{
|
||||
SessionId: sessionId,
|
||||
}, nil
|
||||
}
|
||||
75
weed/mq/agent/agent_grpc_subscribe.go
Normal file
75
weed/mq/agent/agent_grpc_subscribe.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (a *MessageQueueAgent) SubscribeRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error {
|
||||
// the first message is the subscribe request
|
||||
// it should only contain the session id
|
||||
m, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.subscribersLock.RLock()
|
||||
subscriberEntry, found := a.subscribers[SessionId(m.SessionId)]
|
||||
a.subscribersLock.RUnlock()
|
||||
if !found {
|
||||
return fmt.Errorf("subscribe session id %d not found", m.SessionId)
|
||||
}
|
||||
defer func() {
|
||||
subscriberEntry.lastActiveTsNs = time.Now().UnixNano()
|
||||
}()
|
||||
subscriberEntry.lastActiveTsNs = 0
|
||||
|
||||
var lastErr error
|
||||
subscriberEntry.entry.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
|
||||
record := &schema_pb.RecordValue{}
|
||||
err := proto.Unmarshal(m.Data.Value, record)
|
||||
if err != nil {
|
||||
if lastErr == nil {
|
||||
lastErr = err
|
||||
}
|
||||
return
|
||||
}
|
||||
if sendErr := stream.Send(&mq_agent_pb.SubscribeRecordResponse{
|
||||
Key: m.Data.Key,
|
||||
Value: record,
|
||||
TsNs: m.Data.TsNs,
|
||||
}); sendErr != nil {
|
||||
if lastErr == nil {
|
||||
lastErr = sendErr
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
go func() {
|
||||
subErr := subscriberEntry.entry.Subscribe()
|
||||
if subErr != nil {
|
||||
glog.V(0).Infof("subscriber %d subscribe: %v", m.SessionId, subErr)
|
||||
if lastErr == nil {
|
||||
lastErr = subErr
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
m, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if m != nil {
|
||||
subscriberEntry.entry.PartitionOffsetChan <- sub_client.KeyedOffset{
|
||||
Key: m.AckKey,
|
||||
Offset: m.AckSequence,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
52
weed/mq/agent/agent_server.go
Normal file
52
weed/mq/agent/agent_server.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
||||
"google.golang.org/grpc"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type SessionId int64
|
||||
type SessionEntry[T any] struct {
|
||||
entry T
|
||||
lastActiveTsNs int64
|
||||
}
|
||||
|
||||
type MessageQueueAgentOptions struct {
|
||||
SeedBrokers []pb.ServerAddress
|
||||
}
|
||||
|
||||
type MessageQueueAgent struct {
|
||||
mq_agent_pb.UnimplementedSeaweedMessagingAgentServer
|
||||
option *MessageQueueAgentOptions
|
||||
brokers []pb.ServerAddress
|
||||
grpcDialOption grpc.DialOption
|
||||
publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher]
|
||||
publishersLock sync.RWMutex
|
||||
subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]
|
||||
subscribersLock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
|
||||
|
||||
// check masters to list all brokers
|
||||
|
||||
return &MessageQueueAgent{
|
||||
option: option,
|
||||
brokers: []pb.ServerAddress{},
|
||||
grpcDialOption: grpcDialOption,
|
||||
publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
|
||||
subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *MessageQueueAgent) brokersList() []string {
|
||||
var brokers []string
|
||||
for _, broker := range a.brokers {
|
||||
brokers = append(brokers, broker.String())
|
||||
}
|
||||
return brokers
|
||||
}
|
||||
Reference in New Issue
Block a user