refactoring

This commit is contained in:
Chris Lu
2021-02-22 00:28:42 -08:00
parent 151c281f36
commit 1c233ad986
11 changed files with 63 additions and 98 deletions

View File

@@ -1,7 +1,6 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -71,36 +70,33 @@ func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMo
// 3. move to any other volume server as long as it satisfy the replication requirements
// list all the volumes
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
if err := evacuateNormalVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
if err := evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
if err := evacuateEcVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
if err := evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
return nil
}
func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
func evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server
volumeServers := collectVolumeServersByDc(resp.TopologyInfo, "")
volumeServers := collectVolumeServersByDc(topologyInfo, "")
thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
if thisNode == nil {
return fmt.Errorf("%s is not found in this cluster", volumeServer)
}
// move away normal volumes
volumeReplicas, _ := collectVolumeReplicaLocations(resp)
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
for _, diskInfo := range thisNode.info.DiskInfos {
for _, vol := range diskInfo.VolumeInfos {
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
@@ -120,9 +116,9 @@ func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListRes
return nil
}
func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
func evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
ecNodes, _ := collectEcVolumeServersByDc(resp.TopologyInfo, "")
ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "")
thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
if thisNode == nil {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)