refactor
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
@@ -50,13 +49,8 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
|
||||
ret.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
|
||||
|
||||
// save the topic configuration on filer
|
||||
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name)
|
||||
if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
var buf bytes.Buffer
|
||||
filer.ProtoToText(&buf, ret)
|
||||
return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes())
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("create topic %s: %v", topicDir, err)
|
||||
if err := b.saveTopicConfToFiler(request.Topic, ret); err != nil {
|
||||
return nil, fmt.Errorf("configure topic: %v", err)
|
||||
}
|
||||
|
||||
b.Balancer.OnPartitionChange(request.Topic, ret.BrokerPartitionAssignments)
|
||||
|
||||
Reference in New Issue
Block a user