pub sub initial tests

This commit is contained in:
chrislu
2023-08-27 17:50:59 -07:00
parent 4eb8e8624d
commit 4d6c18d86f
10 changed files with 245 additions and 136 deletions

View File

@@ -145,7 +145,7 @@ func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.
return true
}
for _, b := range oldAssignment.FollowerBrokers {
pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
Namespace: string(topic.Namespace),
Topic: topic.Name,

View File

@@ -73,6 +73,25 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
// 3. write to the filer
var localTopicPartition *topic.LocalPartition
req, err := stream.Recv()
if err != nil {
return err
}
response := &mq_pb.PublishResponse{}
// TODO check whether current broker should be the leader for the topic partition
if initMessage := req.GetInit(); initMessage != nil {
localTopicPartition = broker.localTopicManager.GetTopicPartition(
topic.FromPbTopic(initMessage.Topic),
topic.FromPbPartition(initMessage.Partition),
)
if localTopicPartition == nil {
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
return stream.Send(response)
}
}
// process each published messages
for {
req, err := stream.Recv()
if err != nil {
@@ -85,21 +104,8 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
AckSequence: sequence,
}
if dataMessage := req.GetData(); dataMessage != nil {
if localTopicPartition == nil {
response.Error = "topic partition not initialized"
glog.Errorf("topic partition not found")
} else {
localTopicPartition.Publish(dataMessage)
}
} else if initMessage := req.GetInit(); initMessage != nil {
localTopicPartition = broker.localTopicManager.GetTopicPartition(
topic.FromPbTopic(initMessage.Topic),
topic.FromPbPartition(initMessage.Partition),
)
if localTopicPartition == nil {
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
}
print('+')
localTopicPartition.Publish(dataMessage)
}
if err := stream.Send(response); err != nil {
glog.Errorf("Error sending setup response: %v", err)
@@ -121,7 +127,7 @@ func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, reque
localPartiton)
if request.IsLeader {
for _, follower := range localPartiton.FollowerBrokers {
err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.AssignTopicPartitions(context.Background(), request)
return err
})

View File

@@ -1,6 +1,7 @@
package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -15,7 +16,7 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream
if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeResponse{
Message: &mq_pb.SubscribeResponse_Ctrl{
&mq_pb.SubscribeResponse_CtrlMessage{
Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
Error: "not found",
},
},
@@ -24,10 +25,11 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream
}
localTopicPartition.Subscribe("client", time.Now(), func(logEntry *filer_pb.LogEntry) error {
value := logEntry.GetData()
if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
Data: &mq_pb.DataMessage{
Key: []byte("hello"),
Value: []byte("world"),
Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
Value: value,
},
}}); err != nil {
glog.Errorf("Error sending setup response: %v", err)

View File

@@ -112,7 +112,7 @@ func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb
func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return pb.WithBrokerClient(streamingMode, server, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
return fn(client)
})