Merge branch 'master' into mq
This commit is contained in:
@@ -166,8 +166,10 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
|
||||
s3err.PostLog(r, http.StatusCreated, s3err.ErrNone)
|
||||
case "200":
|
||||
s3err.WriteEmptyResponse(w, r, http.StatusOK)
|
||||
case "204":
|
||||
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
||||
default:
|
||||
writeSuccessResponseEmpty(w, r)
|
||||
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -292,6 +292,12 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
|
||||
_, err := stream.Recv()
|
||||
if err != nil {
|
||||
glog.V(2).Infof("- client %v: %v", clientName, err)
|
||||
go func() {
|
||||
// consume message chan to avoid deadlock, go routine exit when message chan is closed
|
||||
for range messageChan {
|
||||
// no op
|
||||
}
|
||||
}()
|
||||
close(stopChan)
|
||||
return
|
||||
}
|
||||
@@ -367,6 +373,8 @@ func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress p
|
||||
func (ms *MasterServer) deleteClient(clientName string) {
|
||||
glog.V(0).Infof("- client %v", clientName)
|
||||
ms.clientChansLock.Lock()
|
||||
// close message chan, so that the KeepConnected go routine can exit
|
||||
close(ms.clientChans[clientName])
|
||||
delete(ms.clientChans, clientName)
|
||||
ms.clientChansLock.Unlock()
|
||||
}
|
||||
|
||||
@@ -304,6 +304,10 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
|
||||
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
for _, diskInfo := range dn.DiskInfos {
|
||||
for _, v := range diskInfo.VolumeInfos {
|
||||
// ignore remote volumes
|
||||
if v.RemoteStorageName != "" && v.RemoteStorageKey != "" {
|
||||
continue
|
||||
}
|
||||
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
|
||||
if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
|
||||
vidMap[v.Id] = true
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
@@ -137,32 +138,46 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
|
||||
return fmt.Errorf("read filer buckets path: %v", err)
|
||||
}
|
||||
|
||||
collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano()
|
||||
var collectCutoffFromAtNs int64 = 0
|
||||
if cutoffTimeAgo.Seconds() != 0 {
|
||||
collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano()
|
||||
}
|
||||
var collectModifyFromAtNs int64 = 0
|
||||
if modifyTimeAgo.Seconds() != 0 {
|
||||
collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano()
|
||||
}
|
||||
// collect each volume file ids
|
||||
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
|
||||
for volumeId, vinfo := range volumeIdToVInfo {
|
||||
if len(c.volumeIds) > 0 {
|
||||
if _, ok := c.volumeIds[volumeId]; !ok {
|
||||
eg, gCtx := errgroup.WithContext(context.Background())
|
||||
_ = gCtx
|
||||
for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
|
||||
dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo
|
||||
eg.Go(func() error {
|
||||
for volumeId, vinfo := range volumeIdToVInfo {
|
||||
if len(c.volumeIds) > 0 {
|
||||
if _, ok := c.volumeIds[volumeId]; !ok {
|
||||
delete(volumeIdToVInfo, volumeId)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if *c.collection != "" && vinfo.collection != *c.collection {
|
||||
delete(volumeIdToVInfo, volumeId)
|
||||
continue
|
||||
}
|
||||
err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
|
||||
}
|
||||
}
|
||||
if *c.collection != "" && vinfo.collection != *c.collection {
|
||||
delete(volumeIdToVInfo, volumeId)
|
||||
continue
|
||||
if *c.verbose {
|
||||
fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
|
||||
}
|
||||
err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
|
||||
}
|
||||
}
|
||||
if *c.verbose {
|
||||
fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
err = eg.Wait()
|
||||
if err != nil {
|
||||
fmt.Fprintf(c.writer, "got error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if *c.findMissingChunksInFiler {
|
||||
@@ -416,7 +431,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
|
||||
}
|
||||
buf.Write(resp.FileContent)
|
||||
}
|
||||
if !vinfo.isReadOnly {
|
||||
if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) {
|
||||
index, err := idx.FirstInvalidIndex(buf.Bytes(),
|
||||
func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
|
||||
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
|
||||
@@ -428,7 +443,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
|
||||
}
|
||||
if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) {
|
||||
if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
|
||||
@@ -139,6 +139,12 @@ func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, vol
|
||||
KeepLocalDatFile: keepLocalDatFile,
|
||||
})
|
||||
|
||||
if stream == nil && copyErr == nil {
|
||||
// when the volume is already uploaded, VolumeTierMoveDatToRemote will return nil stream and nil error
|
||||
// so we should directly return in this case
|
||||
fmt.Fprintf(writer, "volume %v already uploaded", volumeId)
|
||||
return nil
|
||||
}
|
||||
var lastProcessed int64
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
|
||||
17
weed/stats/disk_common.go
Normal file
17
weed/stats/disk_common.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package stats
|
||||
|
||||
import "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
|
||||
func calculateDiskRemaining(disk *volume_server_pb.DiskStatus) {
|
||||
disk.Used = disk.All - disk.Free
|
||||
|
||||
if disk.All > 0 {
|
||||
disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100)
|
||||
disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100)
|
||||
} else {
|
||||
disk.PercentFree = 0
|
||||
disk.PercentUsed = 0
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
//go:build netbsd || plan9 || solaris
|
||||
// +build netbsd plan9 solaris
|
||||
//go:build netbsd || plan9
|
||||
// +build netbsd plan9
|
||||
|
||||
package stats
|
||||
|
||||
|
||||
@@ -15,10 +15,10 @@ func fillInDiskStatus(disk *volume_server_pb.DiskStatus) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
disk.All = fs.F_blocks * uint64(fs.F_bsize)
|
||||
disk.Free = fs.F_bfree * uint64(fs.F_bsize)
|
||||
disk.Used = disk.All - disk.Free
|
||||
disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100)
|
||||
disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100)
|
||||
calculateDiskRemaining(disk)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
24
weed/stats/disk_solaris.go
Normal file
24
weed/stats/disk_solaris.go
Normal file
@@ -0,0 +1,24 @@
|
||||
//go:build solaris
|
||||
// +build solaris
|
||||
|
||||
package stats
|
||||
|
||||
import (
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
)
|
||||
|
||||
func fillInDiskStatus(disk *volume_server_pb.DiskStatus) {
|
||||
var stat unix.Statvfs_t
|
||||
err := unix.Statvfs(disk.Dir, &stat)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
disk.All = stat.Blocks * uint64(stat.Bsize)
|
||||
disk.Free = stat.Bfree * uint64(stat.Bsize)
|
||||
calculateDiskRemaining(disk)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -39,9 +39,7 @@ func fillInDiskStatus(disk *volume_server_pb.DiskStatus) {
|
||||
|
||||
return
|
||||
}
|
||||
disk.Used = disk.All - disk.Free
|
||||
disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100)
|
||||
disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100)
|
||||
calculateDiskRemaining(disk)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *So
|
||||
}
|
||||
glog.V(1).Infof("Opening %s...", fileName)
|
||||
|
||||
if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil {
|
||||
if m.dbFile, err = os.OpenFile(indexBaseFileName+".sdx", os.O_RDWR, 0); err != nil {
|
||||
return
|
||||
}
|
||||
dbStat, _ := m.dbFile.Stat()
|
||||
|
||||
Reference in New Issue
Block a user