Merge branch 'upstream_master' into store_s3cred

This commit is contained in:
Konstantin Lebedev
2020-12-01 16:03:34 +05:00
79 changed files with 1496 additions and 1011 deletions

View File

@@ -22,7 +22,7 @@ func (c *commandCollectionDelete) Name() string {
func (c *commandCollectionDelete) Help() string {
return `delete specified collection
collection.delete -collectin <collection_name> -force
collection.delete -collection <collection_name> -force
`
}

View File

@@ -85,7 +85,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
}
// generate a normal volume
err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
if err != nil {
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
}

View File

@@ -99,13 +99,13 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId)
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// generate ec shards
err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].Url)
if err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}

View File

@@ -107,7 +107,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
if err != nil {
return fmt.Errorf("parse replication %s: %v", *replication, err)
}
if *volumeGrowthCount % rp.GetCopyCount() != 0 {
if *volumeGrowthCount%rp.GetCopyCount() != 0 {
return fmt.Errorf("volumeGrowthCount %d should be devided by replication copy count %d", *volumeGrowthCount, rp.GetCopyCount())
}
}

View File

@@ -72,8 +72,9 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
bytes, _ := proto.Marshal(respLookupEntry.Entry)
gzippedBytes, _ := util.GzipData(bytes)
zstdBytes, _ := util.ZstdData(bytes)
fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes))
// zstdBytes, _ := util.ZstdData(bytes)
// fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes))
fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes))
return nil

View File

@@ -306,16 +306,16 @@ func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*Vol
dcs[targetNode.dc] = true
racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++
if len(dcs) > placement.DiffDataCenterCount+1 {
if len(dcs) != placement.DiffDataCenterCount+1 {
return false
}
if len(racks) > placement.DiffRackCount+placement.DiffDataCenterCount+1 {
if len(racks) != placement.DiffRackCount+placement.DiffDataCenterCount+1 {
return false
}
for _, sameRackCount := range racks {
if sameRackCount > placement.SameRackCount+1 {
if sameRackCount != placement.SameRackCount+1 {
return false
}
}

View File

@@ -20,6 +20,22 @@ func TestIsGoodMove(t *testing.T) {
var tests = []testMoveCase{
{
name: "test 100 move to wrong data centers",
replication: "100",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
targetLocation: location{"dc2", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: false,
},
{
name: "test 100 move to spread into proper data centers",
replication: "100",

View File

@@ -157,12 +157,12 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
IgnoreSourceFileNotFound: false,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err)
return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
}
err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
if err != nil {
return fmt.Errorf("failed to copy %d.idx from %s: %v", volumeId, vinfo.server, err)
return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
}
return nil

View File

@@ -0,0 +1,53 @@
package shell
import (
"context"
"flag"
"io"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
func init() {
Commands = append(Commands, &commandVacuum{})
}
type commandVacuum struct {
}
func (c *commandVacuum) Name() string {
return "volume.vacuum"
}
func (c *commandVacuum) Help() string {
return `compact volumes if deleted entries are more than the limit
volume.vacuum [-garbageThreshold=0.3]
`
}
func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(); err != nil {
return
}
volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
garbageThreshold := volumeVacuumCommand.Float64("garbageThreshold", 0.3, "vacuum when garbage is more than this limit")
if err = volumeVacuumCommand.Parse(args); err != nil {
return nil
}
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
_, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
GarbageThreshold: float32(*garbageThreshold),
})
return err
})
if err != nil {
return
}
return nil
}