volume: automatically trim out unreachable entries
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/backend"
|
"github.com/chrislusf/seaweedfs/weed/storage/backend"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/idx"
|
"github.com/chrislusf/seaweedfs/weed/storage/idx"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||||
@@ -11,17 +12,28 @@ import (
|
|||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) {
|
func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
|
||||||
var indexSize int64
|
var indexSize int64
|
||||||
if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
|
if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
|
||||||
return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
|
return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
|
||||||
}
|
}
|
||||||
if indexSize == 0 {
|
if indexSize == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
// check and fix last 10 entries
|
||||||
|
lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
|
||||||
|
if err != ErrorSizeMismatch {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
|
||||||
var lastIdxEntry []byte
|
var lastIdxEntry []byte
|
||||||
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
|
if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil {
|
||||||
return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
|
return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err)
|
||||||
}
|
}
|
||||||
key, offset, size := idx.IdxFileEntry(lastIdxEntry)
|
key, offset, size := idx.IdxFileEntry(lastIdxEntry)
|
||||||
if offset.IsZero() {
|
if offset.IsZero() {
|
||||||
@@ -29,15 +41,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
|
|||||||
}
|
}
|
||||||
if size < 0 {
|
if size < 0 {
|
||||||
// read the deletion entry
|
// read the deletion entry
|
||||||
if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil {
|
if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil {
|
||||||
return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
|
return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
|
if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil {
|
||||||
return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
|
return lastAppendAtNs, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return lastAppendAtNs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
|
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
|
||||||
@@ -60,18 +72,37 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
|
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
|
||||||
n := new(needle.Needle)
|
n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
|
||||||
// case: node total memory 8g, set volumeLimitSize=2048 , save 10 files, every file size 2.2g or more , when we restart the volume server , while see out of memory error
|
if err != nil {
|
||||||
// fix: When the size of the last file exceeds 10M, consider directly returning the last modify time
|
return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
|
||||||
if size > 10 * 1024 * 1024 {
|
}
|
||||||
bytes , err := needle.ReadNeedleBlob(datFile, offset+int64(size), 0, v);
|
if n.Size != size {
|
||||||
if err == nil {
|
return 0, ErrorSizeMismatch
|
||||||
if v == needle.Version3 {
|
}
|
||||||
tsOffset := NeedleHeaderSize + 0 + needle.NeedleChecksumSize
|
if v == needle.Version3 {
|
||||||
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize])
|
bytes := make([]byte, TimestampSize)
|
||||||
}
|
_, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
|
||||||
}
|
}
|
||||||
return n.AppendAtNs, err
|
n.AppendAtNs = util.BytesToUint64(bytes)
|
||||||
|
fileTailOffset := offset + needle.GetActualSize(size, v)
|
||||||
|
fileSize, _, err := datFile.GetStat()
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err)
|
||||||
|
}
|
||||||
|
if fileSize == fileTailOffset {
|
||||||
|
return n.AppendAtNs, nil
|
||||||
|
}
|
||||||
|
if fileSize > fileTailOffset {
|
||||||
|
glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset)
|
||||||
|
err = datFile.Truncate(fileTailOffset)
|
||||||
|
if err == nil {
|
||||||
|
return n.AppendAtNs, nil
|
||||||
|
}
|
||||||
|
return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err)
|
||||||
|
}
|
||||||
|
glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
|
||||||
}
|
}
|
||||||
if err = n.ReadData(datFile, offset, size, v); err != nil {
|
if err = n.ReadData(datFile, offset, size, v); err != nil {
|
||||||
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
|
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
|
||||||
|
|||||||
@@ -89,7 +89,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
|||||||
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
|
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
|
if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
|
||||||
v.noWriteOrDelete = true
|
v.noWriteOrDelete = true
|
||||||
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
|
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user