Merge branch 'new_master' into ydb
# Conflicts: # go.mod # go.sum
This commit is contained in:
@@ -51,6 +51,7 @@ import (
|
||||
|
||||
type FilerOption struct {
|
||||
Masters map[string]pb.ServerAddress
|
||||
FilerGroup string
|
||||
Collection string
|
||||
DefaultReplication string
|
||||
DisableDirListing bool
|
||||
@@ -119,7 +120,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
||||
glog.Fatal("master list is required!")
|
||||
}
|
||||
|
||||
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() {
|
||||
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
|
||||
fs.listenersCond.Broadcast()
|
||||
})
|
||||
fs.filer.Cipher = option.Cipher
|
||||
|
||||
@@ -201,13 +201,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
|
||||
// buffer by 1 so we don't end up getting stuck writing to stopChan forever
|
||||
stopChan := make(chan bool, 1)
|
||||
|
||||
clientName, messageChan := ms.addClient(req.ClientType, peerAddress)
|
||||
for _, update := range ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) {
|
||||
clientName, messageChan := ms.addClient(req.FilerGroup, req.ClientType, peerAddress)
|
||||
for _, update := range ms.Cluster.AddClusterNode(req.FilerGroup, req.ClientType, peerAddress, req.Version) {
|
||||
ms.broadcastToClients(update)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for _, update := range ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) {
|
||||
for _, update := range ms.Cluster.RemoveClusterNode(req.FilerGroup, req.ClientType, peerAddress) {
|
||||
ms.broadcastToClients(update)
|
||||
}
|
||||
ms.deleteClient(clientName)
|
||||
@@ -276,8 +276,8 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
|
||||
clientName = clientType + "@" + string(clientAddress)
|
||||
func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
|
||||
clientName = filerGroup + "." + clientType + "@" + string(clientAddress)
|
||||
glog.V(0).Infof("+ client %v", clientName)
|
||||
|
||||
// we buffer this because otherwise we end up in a potential deadlock where
|
||||
|
||||
@@ -10,26 +10,26 @@ import (
|
||||
|
||||
func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) {
|
||||
resp := &master_pb.ListClusterNodesResponse{}
|
||||
|
||||
clusterNodes := ms.Cluster.ListClusterNode(req.ClientType)
|
||||
filerGroup := cluster.FilerGroup(req.FilerGroup)
|
||||
clusterNodes := ms.Cluster.ListClusterNode(filerGroup, req.ClientType)
|
||||
|
||||
for _, node := range clusterNodes {
|
||||
resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
|
||||
Address: string(node.Address),
|
||||
Version: node.Version,
|
||||
IsLeader: ms.Cluster.IsOneLeader(node.Address),
|
||||
IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address),
|
||||
})
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ms *MasterServer) GetOneFiler() pb.ServerAddress {
|
||||
func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroup) pb.ServerAddress {
|
||||
|
||||
clusterNodes := ms.Cluster.ListClusterNode(cluster.FilerType)
|
||||
clusterNodes := ms.Cluster.ListClusterNode(filerGroup, cluster.FilerType)
|
||||
|
||||
var filers []pb.ServerAddress
|
||||
for _, node := range clusterNodes {
|
||||
if ms.Cluster.IsOneLeader(node.Address) {
|
||||
if ms.Cluster.IsOneLeader(filerGroup, node.Address) {
|
||||
filers = append(filers, node.Address)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,7 +113,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
|
||||
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
|
||||
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
|
||||
grpcDialOption: grpcDialOption,
|
||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.MasterType, option.Master, "", peers),
|
||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers),
|
||||
adminLocks: NewAdminLocks(),
|
||||
Cluster: cluster.NewCluster(),
|
||||
}
|
||||
@@ -285,7 +285,7 @@ func (ms *MasterServer) startAdminScripts() {
|
||||
for {
|
||||
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
|
||||
if ms.Topo.IsLeader() {
|
||||
shellOptions.FilerAddress = ms.GetOneFiler()
|
||||
shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup))
|
||||
if shellOptions.FilerAddress == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi
|
||||
// locate the disk file
|
||||
diskFile, ok := v.DataBackend.(*backend.DiskFile)
|
||||
if !ok {
|
||||
return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
|
||||
return nil // already copied to remove. fmt.Errorf("volume %d is not on local disk", req.VolumeId)
|
||||
}
|
||||
|
||||
// check valid storage backend type
|
||||
|
||||
Reference in New Issue
Block a user