add dc and rack

This commit is contained in:
chrislu
2022-07-03 00:29:25 -07:00
parent 0f5d7ed242
commit 9f20d3ebd1
21 changed files with 1052 additions and 1030 deletions

View File

@@ -24,6 +24,15 @@ type Leaders struct {
leaders [3]pb.ServerAddress
}
type DataCenter string
type Rack string
type DataCenterBrokers struct {
brokers map[Rack]*RackBrokers
}
type RackBrokers struct {
brokers map[pb.ServerAddress]*ClusterNode
}
type ClusterNode struct {
Address pb.ServerAddress
Version string
@@ -34,14 +43,14 @@ type ClusterNode struct {
type Cluster struct {
filerGroup2filers map[FilerGroup]*Filers
filersLock sync.RWMutex
brokers map[pb.ServerAddress]*ClusterNode
brokers map[DataCenter]*DataCenterBrokers
brokersLock sync.RWMutex
}
func NewCluster() *Cluster {
return &Cluster{
filerGroup2filers: make(map[FilerGroup]*Filers),
brokers: make(map[pb.ServerAddress]*ClusterNode),
brokers: make(map[DataCenter]*DataCenterBrokers),
}
}
@@ -57,7 +66,7 @@ func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool)
return filers
}
func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
@@ -78,11 +87,24 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd
case BrokerType:
cluster.brokersLock.Lock()
defer cluster.brokersLock.Unlock()
if existingNode, found := cluster.brokers[address]; found {
existingNode.counter++
existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
if !foundDataCenter {
existingDataCenterBrokers = &DataCenterBrokers{
brokers: make(map[Rack]*RackBrokers),
}
}
existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[rack]
if !foundRack {
existingRackBrokers = &RackBrokers{
brokers: make(map[pb.ServerAddress]*ClusterNode),
}
}
if existingBroker, found := existingRackBrokers.brokers[address]; found {
existingBroker.counter++
return nil
}
cluster.brokers[address] = &ClusterNode{
existingRackBrokers.brokers[address] = &ClusterNode{
Address: address,
Version: version,
counter: 1,
@@ -111,7 +133,7 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd
return nil
}
func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
@@ -133,23 +155,35 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
case BrokerType:
cluster.brokersLock.Lock()
defer cluster.brokersLock.Unlock()
if existingNode, found := cluster.brokers[address]; !found {
existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
if !foundDataCenter {
return nil
} else {
existingNode.counter--
if existingNode.counter <= 0 {
delete(cluster.brokers, address)
return []*master_pb.KeepConnectedResponse{
{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsAdd: false,
},
}
existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[Rack(rack)]
if !foundRack {
return nil
}
existingBroker, found := existingRackBrokers.brokers[address]
if !found {
return nil
}
existingBroker.counter--
if existingBroker.counter <= 0 {
delete(existingRackBrokers.brokers, address)
return []*master_pb.KeepConnectedResponse{
{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsAdd: false,
},
}
},
}
}
return nil
case MasterType:
return []*master_pb.KeepConnectedResponse{
{
@@ -179,8 +213,12 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string)
case BrokerType:
cluster.brokersLock.RLock()
defer cluster.brokersLock.RUnlock()
for _, node := range cluster.brokers {
nodes = append(nodes, node)
for _, dcNodes := range cluster.brokers {
for _, rackNodes := range dcNodes.brokers {
for _, node := range rackNodes.brokers {
nodes = append(nodes, node)
}
}
}
case MasterType:
}

View File

@@ -11,24 +11,24 @@ import (
func TestClusterAddRemoveNodes(t *testing.T) {
c := NewCluster()
c.AddClusterNode("", "filer", pb.ServerAddress("111:1"), "23.45")
c.AddClusterNode("", "filer", pb.ServerAddress("111:2"), "23.45")
c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:1"), "23.45")
c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:2"), "23.45")
assert.Equal(t, []pb.ServerAddress{
pb.ServerAddress("111:1"),
pb.ServerAddress("111:2"),
}, c.getFilers("", false).leaders.GetLeaders())
c.AddClusterNode("", "filer", pb.ServerAddress("111:3"), "23.45")
c.AddClusterNode("", "filer", pb.ServerAddress("111:4"), "23.45")
c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:3"), "23.45")
c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:4"), "23.45")
assert.Equal(t, []pb.ServerAddress{
pb.ServerAddress("111:1"),
pb.ServerAddress("111:2"),
pb.ServerAddress("111:3"),
}, c.getFilers("", false).leaders.GetLeaders())
c.AddClusterNode("", "filer", pb.ServerAddress("111:5"), "23.45")
c.AddClusterNode("", "filer", pb.ServerAddress("111:6"), "23.45")
c.RemoveClusterNode("", "filer", pb.ServerAddress("111:4"))
c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:5"), "23.45")
c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:6"), "23.45")
c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress("111:4"))
assert.Equal(t, []pb.ServerAddress{
pb.ServerAddress("111:1"),
pb.ServerAddress("111:2"),
@@ -36,7 +36,7 @@ func TestClusterAddRemoveNodes(t *testing.T) {
}, c.getFilers("", false).leaders.GetLeaders())
// remove oldest
c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress("111:1"))
assert.Equal(t, []pb.ServerAddress{
pb.ServerAddress("111:6"),
pb.ServerAddress("111:2"),
@@ -44,7 +44,7 @@ func TestClusterAddRemoveNodes(t *testing.T) {
}, c.getFilers("", false).leaders.GetLeaders())
// remove oldest
c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress("111:1"))
}
@@ -56,7 +56,7 @@ func TestConcurrentAddRemoveNodes(t *testing.T) {
go func(i int) {
defer wg.Done()
address := strconv.Itoa(i)
c.AddClusterNode("", "filer", pb.ServerAddress(address), "23.45")
c.AddClusterNode("", "filer", "", "", pb.ServerAddress(address), "23.45")
}(i)
}
wg.Wait()
@@ -66,7 +66,7 @@ func TestConcurrentAddRemoveNodes(t *testing.T) {
go func(i int) {
defer wg.Done()
address := strconv.Itoa(i)
node := c.RemoveClusterNode("", "filer", pb.ServerAddress(address))
node := c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress(address))
if len(node) == 0 {
t.Errorf("TestConcurrentAddRemoveNodes: node[%s] not found", address)