shell: volume.fix.replication adds collectionPattern with wildcard characters

fix https://github.com/chrislusf/seaweedfs/issues/1758
This commit is contained in:
Chris Lu
2021-01-26 22:30:37 -08:00
parent 2945a64fd6
commit b3f66199db

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"io" "io"
"path/filepath"
"sort" "sort"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
@@ -19,6 +20,7 @@ func init() {
} }
type commandVolumeFixReplication struct { type commandVolumeFixReplication struct {
collectionPattern *string
} }
func (c *commandVolumeFixReplication) Name() string { func (c *commandVolumeFixReplication) Name() string {
@@ -33,8 +35,9 @@ func (c *commandVolumeFixReplication) Help() string {
This command also finds all under-replicated volumes, and finds volume servers with free slots. This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
volume.fix.replication -n # do not take action volume.fix.replication -n # do not take action
volume.fix.replication # actually deleting or copying the volume files and mount the volume volume.fix.replication # actually deleting or copying the volume files and mount the volume
volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
Note: Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas * each time this will only add back one replica for one volume id. If there are multiple replicas
@@ -52,6 +55,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
} }
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
if err = volFixReplicationCommand.Parse(args); err != nil { if err = volFixReplicationCommand.Parse(args); err != nil {
return nil return nil
@@ -127,6 +131,17 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
replica := pickOneReplicaToDelete(replicas, replicaPlacement) replica := pickOneReplicaToDelete(replicas, replicaPlacement)
// check collection name pattern
if *c.collectionPattern != "" {
matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
if err != nil {
return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
}
if !matched {
break
}
}
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
if !takeAction { if !takeAction {
@@ -150,6 +165,17 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
for _, dst := range allLocations { for _, dst := range allLocations {
// check whether data nodes satisfy the constraints // check whether data nodes satisfy the constraints
if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// check collection name pattern
if *c.collectionPattern != "" {
matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
if err != nil {
return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
}
if !matched {
break
}
}
// ask the volume server to replicate the volume // ask the volume server to replicate the volume
foundNewLocation = true foundNewLocation = true
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)