adjust proto

This commit is contained in:
chrislu
2023-08-26 13:39:21 -07:00
parent c45665eb97
commit 905911853d
5 changed files with 658 additions and 169 deletions

View File

@@ -93,12 +93,12 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
}
} else if initMessage := req.GetInit(); initMessage != nil {
localTopicPartition = broker.localTopicManager.GetTopicPartition(
topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic),
topic.FromPbPartition(initMessage.Segment.Partition),
topic.FromPbTopic(initMessage.Topic),
topic.FromPbPartition(initMessage.Partition),
)
if localTopicPartition == nil {
response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment)
glog.Errorf("topic partition %v not found", initMessage.Segment)
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
}
}
if err := stream.Send(response); err != nil {

View File

@@ -14,7 +14,7 @@ type LocalPartition struct {
logBuffer *log_buffer.LogBuffer
}
func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) {
func (p LocalPartition) Publish(message *mq_pb.DataMessage) {
p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
}