Merge branch 'master' into slices.SortFunc

This commit is contained in:
leyou240
2022-04-18 10:39:29 +08:00
committed by GitHub
39 changed files with 2656 additions and 862 deletions

View File

@@ -7,10 +7,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"io"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
func init() {
@@ -48,7 +47,10 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""]
hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"]
if !emptyDiskTypeFound && !hddDiskTypeFound || emptyDiskTypeDiskInfo.VolumeCount == 0 && hddDiskTypeDiskInfo.VolumeCount == 0 {
if !emptyDiskTypeFound && !hddDiskTypeFound {
return fmt.Errorf("Need to a hdd disk type!")
}
if emptyDiskTypeFound && emptyDiskTypeDiskInfo.VolumeCount == 0 || hddDiskTypeFound && hddDiskTypeDiskInfo.VolumeCount == 0 {
return fmt.Errorf("Need to a hdd disk type!")
}
@@ -95,15 +97,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, volumeServer := range volumeServers {
fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer))
err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(context.Background(), &master_pb.PingRequest{
pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
Target: string(volumeServer),
TargetType: cluster.VolumeServerType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
@@ -117,15 +120,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
}
fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster))
err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(context.Background(), &master_pb.PingRequest{
pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
Target: string(targetMaster),
TargetType: cluster.MasterType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
@@ -136,15 +140,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, master := range masters {
fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master))
err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
Target: string(master),
TargetType: cluster.MasterType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
@@ -155,15 +160,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, master := range masters {
fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master))
err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(context.Background(), &filer_pb.PingRequest{
pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(master),
TargetType: cluster.MasterType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
@@ -174,15 +180,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, volumeServer := range volumeServers {
fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer))
err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(context.Background(), &filer_pb.PingRequest{
pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(volumeServer),
TargetType: cluster.VolumeServerType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
@@ -196,15 +203,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
}
fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
Target: string(targetVolumeServer),
TargetType: cluster.VolumeServerType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
@@ -215,15 +223,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, targetFiler := range filers {
fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler))
err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(context.Background(), &filer_pb.PingRequest{
pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(targetFiler),
TargetType: cluster.FilerType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
@@ -231,3 +240,9 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
return nil
}
func printTiming(writer io.Writer, startNs, remoteNs, stopNs int64) {
roundTripTimeMs := float32(stopNs-startNs) / 1000000
deltaTimeMs := float32(remoteNs-(startNs+stopNs)/2) / 1000000
fmt.Fprintf(writer, "ok round trip %.3fms clock delta %.3fms\n", roundTripTimeMs, deltaTimeMs)
}

View File

@@ -0,0 +1,59 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"io"
)
func init() {
Commands = append(Commands, &commandRaftServerAdd{})
}
type commandRaftServerAdd struct {
}
func (c *commandRaftServerAdd) Name() string {
return "cluster.raft.add"
}
func (c *commandRaftServerAdd) Help() string {
return `add a server to the raft cluster
Example:
cluster.raft.add -id <server_name> -address <server_host:port> -voter
`
}
func (c *commandRaftServerAdd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
serverId := raftServerAddCommand.String("id", "", "server id")
serverAddress := raftServerAddCommand.String("address", "", "server grpc address")
serverVoter := raftServerAddCommand.Bool("voter", true, "assign it a vote")
if err = raftServerAddCommand.Parse(args); err != nil {
return nil
}
if *serverId == "" || *serverAddress == "" {
return fmt.Errorf("empty server id or address")
}
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftAddServer(context.Background(), &master_pb.RaftAddServerRequest{
Id: *serverId,
Address: *serverAddress,
Voter: *serverVoter,
})
if err != nil {
return fmt.Errorf("raft add server: %v", err)
}
println("added server", *serverId)
return nil
})
return err
}

View File

@@ -0,0 +1,51 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"io"
)
func init() {
Commands = append(Commands, &commandRaftClusterPs{})
}
type commandRaftClusterPs struct {
}
func (c *commandRaftClusterPs) Name() string {
return "cluster.raft.ps"
}
func (c *commandRaftClusterPs) Help() string {
return `check current raft cluster status
cluster.raft.ps
`
}
func (c *commandRaftClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftClusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
if err = raftClusterPsCommand.Parse(args); err != nil {
return nil
}
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{})
if err != nil {
return fmt.Errorf("raft list cluster: %v", err)
}
fmt.Fprintf(writer, "the raft cluster has %d servers\n", len(resp.ClusterServers))
for _, server := range resp.ClusterServers {
fmt.Fprintf(writer, " * %s %s (%s)\n", server.Id, server.Address, server.Suffrage)
}
return nil
})
return err
}

View File

@@ -0,0 +1,56 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"io"
)
func init() {
Commands = append(Commands, &commandRaftServerRemove{})
}
type commandRaftServerRemove struct {
}
func (c *commandRaftServerRemove) Name() string {
return "cluster.raft.remove"
}
func (c *commandRaftServerRemove) Help() string {
return `remove a server from the raft cluster
Example:
cluster.raft.remove -id <server_name>
`
}
func (c *commandRaftServerRemove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
serverId := raftServerAddCommand.String("id", "", "server id")
if err = raftServerAddCommand.Parse(args); err != nil {
return nil
}
if *serverId == "" {
return fmt.Errorf("empty server id")
}
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: *serverId,
Force: true,
})
if err != nil {
return fmt.Errorf("raft remove server: %v", err)
}
println("removed server", *serverId)
return nil
})
return err
}

View File

@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"math"
@@ -39,6 +40,8 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
signingKey := util.GetViper().GetString("jwt.signing.key")
var filerBucketsPath string
filerBucketsPath, err = readFilerBucketsPath(commandEnv)
if err != nil {
@@ -55,14 +58,16 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer
}
for _, bucket := range buckets {
c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo)
if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil {
fmt.Fprintf(writer, fmt.Sprintf("failed cleanup uploads for backet %s: %v", bucket, err))
}
}
return err
}
func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error {
func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration, signingKey string) error {
uploadsDir := filerBucketsPath + "/" + bucket + "/.uploads"
var staleUploads []string
now := time.Now()
@@ -77,12 +82,17 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io
return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
}
var encodedJwt security.EncodedJwt
if signingKey != "" {
encodedJwt = security.GenJwtForFilerServer(security.SigningKey(signingKey), 15*60)
}
for _, staleUpload := range staleUploads {
deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
err = util.Delete(deleteUrl, "")
if err != nil {
err = util.Delete(deleteUrl, string(encodedJwt))
if err != nil && err.Error() != "" {
return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
}
}