able to read chan and write chan
This commit is contained in:
@@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
@@ -94,6 +95,9 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
|
||||
|
||||
for _, filer := range broker.option.Filers {
|
||||
if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
glog.V(0).Infof("fail to connect to %s: %v", filer, err)
|
||||
} else {
|
||||
break
|
||||
|
||||
@@ -10,6 +10,10 @@ func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messagin
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
@@ -25,12 +26,40 @@ If one of the pub or sub connects very late, and the system topo changed quite a
|
||||
|
||||
func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
|
||||
|
||||
panic("implement me")
|
||||
t := &messaging_pb.FindBrokerResponse{}
|
||||
var peers []string
|
||||
|
||||
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
|
||||
|
||||
for _, filer := range broker.option.Filers {
|
||||
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
|
||||
Resource: targetTopicPartition,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Found && len(resp.Resources) > 0 {
|
||||
t.Broker = resp.Resources[0].GrpcAddresses
|
||||
return nil
|
||||
}
|
||||
for _, b := range resp.Resources {
|
||||
peers = append(peers, b.GrpcAddresses)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
t.Broker = PickMember(peers, []byte(targetTopicPartition))
|
||||
|
||||
return t, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
func (broker *MessageBroker) checkPeers() {
|
||||
func (broker *MessageBroker) checkFilers() {
|
||||
|
||||
// contact a filer about masters
|
||||
var masters []string
|
||||
|
||||
@@ -47,24 +47,11 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
|
||||
tl := broker.topicLocks.RequestLock(tp, topicConfig, true)
|
||||
defer broker.topicLocks.ReleaseLock(tp, true)
|
||||
|
||||
updatesChan := make(chan int32)
|
||||
|
||||
go func() {
|
||||
for update := range updatesChan {
|
||||
if err := stream.Send(&messaging_pb.PublishResponse{
|
||||
Config: &messaging_pb.PublishResponse_ConfigMessage{
|
||||
PartitionCount: update,
|
||||
},
|
||||
}); err != nil {
|
||||
glog.V(0).Infof("err sending publish response: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// process each message
|
||||
for {
|
||||
// println("recv")
|
||||
in, err := stream.Recv()
|
||||
// glog.V(0).Infof("recieved %v err: %v", in, err)
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
@@ -86,5 +73,18 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
|
||||
|
||||
tl.logBuffer.AddToBuffer(in.Data.Key, data)
|
||||
|
||||
if in.Data.IsClose {
|
||||
// println("server received closing")
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// send the close ack
|
||||
// println("server send ack closing")
|
||||
if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
|
||||
glog.V(0).Infof("err sending close response: %v", err)
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
@@ -83,11 +83,17 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
|
||||
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
|
||||
return err
|
||||
}
|
||||
if m.IsClose {
|
||||
// println("processed EOF")
|
||||
return io.EOF
|
||||
}
|
||||
processedTsNs = logEntry.TsNs
|
||||
messageCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
|
||||
// println("stopping from persisted logs")
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -95,7 +101,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
|
||||
lastReadTime = time.Unix(0, processedTsNs)
|
||||
}
|
||||
|
||||
messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
|
||||
err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
|
||||
lock.Mutex.Lock()
|
||||
lock.cond.Wait()
|
||||
lock.Mutex.Unlock()
|
||||
@@ -124,7 +130,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){
|
||||
if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
|
||||
return nil
|
||||
}
|
||||
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
|
||||
@@ -133,7 +139,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
|
||||
if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
|
||||
chunkedFileReader.Close()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
|
||||
|
||||
messageBroker.topicLocks = NewTopicLocks(messageBroker)
|
||||
|
||||
messageBroker.checkPeers()
|
||||
messageBroker.checkFilers()
|
||||
|
||||
go messageBroker.keepConnectedToOneFiler()
|
||||
|
||||
@@ -53,6 +53,24 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
|
||||
glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
|
||||
return err
|
||||
}
|
||||
|
||||
initRequest := &filer_pb.KeepConnectedRequest{
|
||||
Name: broker.option.Ip,
|
||||
GrpcPort: uint32(broker.option.Port),
|
||||
}
|
||||
for _, tp := range broker.topicLocks.ListTopicPartitions() {
|
||||
initRequest.Resources = append(initRequest.Resources, tp.String())
|
||||
}
|
||||
if err := stream.Send(&filer_pb.KeepConnectedRequest{
|
||||
Name: broker.option.Ip,
|
||||
GrpcPort: uint32(broker.option.Port),
|
||||
}); err != nil {
|
||||
glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO send events of adding/removing topics
|
||||
|
||||
glog.V(0).Infof("conntected with filer: %v", filer)
|
||||
for {
|
||||
if err := stream.Send(&filer_pb.KeepConnectedRequest{
|
||||
@@ -68,12 +86,12 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
|
||||
return err
|
||||
}
|
||||
// println("received reply")
|
||||
time.Sleep(11*time.Second)
|
||||
time.Sleep(11 * time.Second)
|
||||
// println("woke up")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
time.Sleep(3*time.Second)
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
38
weed/messaging/broker/consistent_distribution.go
Normal file
38
weed/messaging/broker/consistent_distribution.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/buraksezer/consistent"
|
||||
)
|
||||
|
||||
type Member string
|
||||
|
||||
func (m Member) String() string {
|
||||
return string(m)
|
||||
}
|
||||
|
||||
type hasher struct{}
|
||||
|
||||
func (h hasher) Sum64(data []byte) uint64 {
|
||||
return xxhash.Sum64(data)
|
||||
}
|
||||
|
||||
func PickMember(members []string, key []byte) string {
|
||||
cfg := consistent.Config{
|
||||
PartitionCount: 9791,
|
||||
ReplicationFactor: 2,
|
||||
Load: 1.25,
|
||||
Hasher: hasher{},
|
||||
}
|
||||
|
||||
cmembers := []consistent.Member{}
|
||||
for _, m := range members {
|
||||
cmembers = append(cmembers, Member(m))
|
||||
}
|
||||
|
||||
c := consistent.New(cmembers, cfg)
|
||||
|
||||
m := c.LocateKey(key)
|
||||
|
||||
return m.String()
|
||||
}
|
||||
32
weed/messaging/broker/consistent_distribution_test.go
Normal file
32
weed/messaging/broker/consistent_distribution_test.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPickMember(t *testing.T) {
|
||||
|
||||
servers := []string{
|
||||
"s1:port",
|
||||
"s2:port",
|
||||
"s3:port",
|
||||
"s5:port",
|
||||
"s4:port",
|
||||
}
|
||||
|
||||
total := 1000
|
||||
|
||||
distribution := make(map[string]int)
|
||||
for i:=0;i<total;i++{
|
||||
tp := fmt.Sprintf("tp:%2d", i)
|
||||
m := PickMember(servers, []byte(tp))
|
||||
// println(tp, "=>", m)
|
||||
distribution[m]++
|
||||
}
|
||||
|
||||
for member, count := range distribution {
|
||||
fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,6 +16,13 @@ type TopicPartition struct {
|
||||
Topic string
|
||||
Partition int32
|
||||
}
|
||||
const (
|
||||
TopicPartitionFmt = "%s/%s_%2d"
|
||||
)
|
||||
func (tp *TopicPartition) String() string {
|
||||
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
|
||||
}
|
||||
|
||||
type TopicLock struct {
|
||||
sync.Mutex
|
||||
cond *sync.Cond
|
||||
@@ -101,3 +108,13 @@ func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
|
||||
delete(tl.locks, partition)
|
||||
}
|
||||
}
|
||||
|
||||
func (tl *TopicLocks) ListTopicPartitions() (tps []TopicPartition) {
|
||||
tl.Lock()
|
||||
defer tl.Unlock()
|
||||
|
||||
for k := range tl.locks {
|
||||
tps = append(tps, k)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user