fsck: only check the appendNs of deleted needle (#5841)
increase fsck speed Co-authored-by: Yang Wang <yangwang@weride.ai>
This commit is contained in:
@@ -7,19 +7,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -31,7 +18,20 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -164,7 +164,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
|
|||||||
delete(volumeIdToVInfo, volumeId)
|
delete(volumeIdToVInfo, volumeId)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
|
err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
|
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
|
||||||
}
|
}
|
||||||
@@ -199,7 +199,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
|
|||||||
return fmt.Errorf("failed to collect file ids from filer: %v", err)
|
return fmt.Errorf("failed to collect file ids from filer: %v", err)
|
||||||
}
|
}
|
||||||
// volume file ids subtract filer file ids
|
// volume file ids subtract filer file ids
|
||||||
if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging); err != nil {
|
if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)); err != nil {
|
||||||
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
|
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -289,7 +289,7 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool) error {
|
func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool, modifyFrom, cutoffFrom uint64) error {
|
||||||
|
|
||||||
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
|
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
|
||||||
volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
|
volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
|
||||||
@@ -299,7 +299,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
|
|||||||
serverReplicas := make(map[uint32][]pb.ServerAddress)
|
serverReplicas := make(map[uint32][]pb.ServerAddress)
|
||||||
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
|
||||||
for volumeId, vinfo := range volumeIdToVInfo {
|
for volumeId, vinfo := range volumeIdToVInfo {
|
||||||
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo)
|
inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo, modifyFrom, cutoffFrom)
|
||||||
if checkErr != nil {
|
if checkErr != nil {
|
||||||
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
|
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
|
||||||
}
|
}
|
||||||
@@ -395,7 +395,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, modifyFrom uint64, cutoffFrom uint64) error {
|
func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo) error {
|
||||||
|
|
||||||
if *c.verbose {
|
if *c.verbose {
|
||||||
fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
|
fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
|
||||||
@@ -432,29 +432,6 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
|
|||||||
}
|
}
|
||||||
buf.Write(resp.FileContent)
|
buf.Write(resp.FileContent)
|
||||||
}
|
}
|
||||||
if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) {
|
|
||||||
index, err := idx.FirstInvalidIndex(buf.Bytes(),
|
|
||||||
func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
|
|
||||||
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
|
|
||||||
VolumeId: volumeId,
|
|
||||||
NeedleId: uint64(key),
|
|
||||||
Offset: offset.ToActualOffset(),
|
|
||||||
Size: int32(size),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
|
|
||||||
}
|
|
||||||
if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(c.writer, "Failed to search for last valid index on volume %d with error %v\n", volumeId, err)
|
|
||||||
} else {
|
|
||||||
buf.Truncate(index * types.NeedleMapEntrySize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)
|
idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)
|
||||||
err = writeToFile(buf.Bytes(), idxFilename)
|
err = writeToFile(buf.Bytes(), idxFilename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -570,7 +547,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
|
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo, modifyFrom, cutoffFrom uint64) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
|
||||||
|
|
||||||
volumeFileIdDb := needle_map.NewMemDb()
|
volumeFileIdDb := needle_map.NewMemDb()
|
||||||
defer volumeFileIdDb.Close()
|
defer volumeFileIdDb.Close()
|
||||||
@@ -610,9 +587,30 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
|
|||||||
if n.Size.IsDeleted() {
|
if n.Size.IsDeleted() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
|
if cutoffFrom > 0 || modifyFrom > 0 {
|
||||||
orphanFileCount++
|
return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
|
||||||
orphanDataSize += uint64(n.Size)
|
func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
|
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
|
||||||
|
VolumeId: volumeId,
|
||||||
|
NeedleId: types.NeedleIdToUint64(n.Key),
|
||||||
|
Offset: n.Offset.ToActualOffset(),
|
||||||
|
Size: int32(n.Size),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read needle meta with id %d from volume %d: %v", n.Key, volumeId, err)
|
||||||
|
}
|
||||||
|
if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
|
||||||
|
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
|
||||||
|
orphanFileCount++
|
||||||
|
orphanDataSize += uint64(n.Size)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
|
||||||
|
orphanFileCount++
|
||||||
|
orphanDataSize += uint64(n.Size)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
err = fmt.Errorf("failed to AscendingVisit %+v", err)
|
err = fmt.Errorf("failed to AscendingVisit %+v", err)
|
||||||
|
|||||||
Reference in New Issue
Block a user