allocate brokers to serve segments

This commit is contained in:
chrislu
2022-07-11 00:20:27 -07:00
parent bb01b68fa0
commit 9f479aab98
12 changed files with 1311 additions and 254 deletions

View File

@@ -49,15 +49,15 @@ func newClusterNodeGroups() *ClusterNodeGroups {
}
}
func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfNotFound bool) *GroupMembers {
filers, found := g.groupMembers[filerGroup]
members, found := g.groupMembers[filerGroup]
if !found && createIfNotFound {
filers = &GroupMembers{
members = &GroupMembers{
members: make(map[pb.ServerAddress]*ClusterNode),
leaders: &Leaders{},
}
g.groupMembers[filerGroup] = filers
g.groupMembers[filerGroup] = members
}
return filers
return members
}
func (m *GroupMembers) addMember(dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) *ClusterNode {
@@ -131,6 +131,16 @@ func (g *ClusterNodeGroups) IsOneLeader(filerGroup FilerGroupName, address pb.Se
}
return m.leaders.isOneLeader(address)
}
func (g *ClusterNodeGroups) ListClusterNodeLeaders(filerGroup FilerGroupName) (nodes []pb.ServerAddress) {
g.Lock()
defer g.Unlock()
m := g.getGroupMembers(filerGroup, false)
if m == nil {
return nil
}
return m.leaders.GetLeaders()
}
func NewCluster() *Cluster {
return &Cluster{
filerGroups: newClusterNodeGroups(),
@@ -201,6 +211,17 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType stri
return
}
func (cluster *Cluster) ListClusterNodeLeaders(filerGroup FilerGroupName, nodeType string) (nodes []pb.ServerAddress) {
switch nodeType {
case FilerType:
return cluster.filerGroups.ListClusterNodeLeaders(filerGroup)
case BrokerType:
return cluster.brokerGroups.ListClusterNodeLeaders(filerGroup)
case MasterType:
}
return
}
func (cluster *Cluster) IsOneLeader(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) bool {
switch nodeType {
case FilerType: