Merge branch 'master' into a
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
|
||||
@@ -43,28 +44,40 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
|
||||
}
|
||||
|
||||
func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) {
|
||||
resp = &filer_pb.PingResponse{}
|
||||
resp = &filer_pb.PingResponse{
|
||||
StartTimeNs: time.Now().UnixNano(),
|
||||
}
|
||||
if req.TargetType == cluster.FilerType {
|
||||
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if req.TargetType == cluster.VolumeServerType {
|
||||
pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if req.TargetType == cluster.MasterType {
|
||||
pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.Ping(ctx, &master_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if pingErr != nil {
|
||||
pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
|
||||
}
|
||||
resp.StopTimeNs = time.Now().UnixNano()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -4,10 +4,10 @@ import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"golang.org/x/exp/slices"
|
||||
"hash"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -130,11 +130,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
|
||||
fs.filer.DeleteChunks(fileChunks)
|
||||
return nil, md5Hash, 0, uploadErr, nil
|
||||
}
|
||||
|
||||
sort.Slice(fileChunks, func(i, j int) bool {
|
||||
return fileChunks[i].Offset < fileChunks[j].Offset
|
||||
slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
|
||||
return a.Offset < b.Offset
|
||||
})
|
||||
|
||||
return fileChunks, md5Hash, chunkOffset, nil, smallContent
|
||||
}
|
||||
|
||||
|
||||
@@ -148,27 +148,39 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re
|
||||
}
|
||||
|
||||
func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
|
||||
resp = &master_pb.PingResponse{}
|
||||
resp = &master_pb.PingResponse{
|
||||
StartTimeNs: time.Now().UnixNano(),
|
||||
}
|
||||
if req.TargetType == cluster.FilerType {
|
||||
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if req.TargetType == cluster.VolumeServerType {
|
||||
pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if req.TargetType == cluster.MasterType {
|
||||
pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.Ping(ctx, &master_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if pingErr != nil {
|
||||
pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
|
||||
}
|
||||
resp.StopTimeNs = time.Now().UnixNano()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -268,7 +268,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
|
||||
|
||||
resp := &master_pb.VacuumVolumeResponse{}
|
||||
|
||||
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.preallocateSize)
|
||||
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
}
|
||||
// glog.Infoln("garbageThreshold =", gcThreshold)
|
||||
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize)
|
||||
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize)
|
||||
ms.dirStatusHandler(w, r)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
@@ -253,27 +254,39 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
|
||||
resp = &volume_server_pb.PingResponse{}
|
||||
resp = &volume_server_pb.PingResponse{
|
||||
StartTimeNs: time.Now().UnixNano(),
|
||||
}
|
||||
if req.TargetType == cluster.FilerType {
|
||||
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if req.TargetType == cluster.VolumeServerType {
|
||||
pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if req.TargetType == cluster.MasterType {
|
||||
pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.Ping(ctx, &master_pb.PingRequest{})
|
||||
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
|
||||
if pingResp != nil {
|
||||
resp.RemoteTimeNs = pingResp.StartTimeNs
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
if pingErr != nil {
|
||||
pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
|
||||
}
|
||||
resp.StopTimeNs = time.Now().UnixNano()
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user