optimiz commitig compact (#3388)
* optimiz vacuuming volume * fix bugx * rename parameters * fix conflict * change copyDataBasedOnIndexFile to an instance method * close needlemap * optimiz commiting Vacuum volume for leveldb index * fix bugs * fix leveldb loading bugs * refactor * fix leveldb loading bug * add leveldb recovery * add test case for levelDB * modify test case to cover all the new branches * use one tmpNm instead of two instances * refactor * refactor * move setWatermark to the end * add test for watermark and updating leveldb * fix error logic * refactor, add test * check nil before close needlemapeer add test case fix metric bug * add tests, fix bugs * adjust log level remove wrong test case refactor * avoid duplicate updating metric for leveldb index
This commit is contained in:
@@ -19,7 +19,7 @@ import (
|
||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
//mark it every watermarkBatchSize operations
|
||||
// mark it every watermarkBatchSize operations
|
||||
const watermarkBatchSize = 10000
|
||||
|
||||
var watermarkKey = []byte("idx_entry_watermark")
|
||||
@@ -165,7 +165,7 @@ func getWatermark(db *leveldb.DB) uint64 {
|
||||
}
|
||||
|
||||
func setWatermark(db *leveldb.DB, watermark uint64) error {
|
||||
glog.V(1).Infof("set watermark %d", watermark)
|
||||
glog.V(3).Infof("set watermark %d", watermark)
|
||||
var wmBytes = make([]byte, 8)
|
||||
util.Uint64toBytes(wmBytes, watermark)
|
||||
if err := db.Put(watermarkKey, wmBytes, nil); err != nil {
|
||||
@@ -215,12 +215,14 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
|
||||
}
|
||||
|
||||
func (m *LevelDbNeedleMap) Close() {
|
||||
indexFileName := m.indexFile.Name()
|
||||
if err := m.indexFile.Sync(); err != nil {
|
||||
glog.Warningf("sync file %s failed: %v", indexFileName, err)
|
||||
}
|
||||
if err := m.indexFile.Close(); err != nil {
|
||||
glog.Warningf("close index file %s failed: %v", indexFileName, err)
|
||||
if m.indexFile != nil {
|
||||
indexFileName := m.indexFile.Name()
|
||||
if err := m.indexFile.Sync(); err != nil {
|
||||
glog.Warningf("sync file %s failed: %v", indexFileName, err)
|
||||
}
|
||||
if err := m.indexFile.Close(); err != nil {
|
||||
glog.Warningf("close index file %s failed: %v", indexFileName, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.db != nil {
|
||||
@@ -235,3 +237,99 @@ func (m *LevelDbNeedleMap) Destroy() error {
|
||||
os.Remove(m.indexFile.Name())
|
||||
return os.RemoveAll(m.dbFileName)
|
||||
}
|
||||
|
||||
func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error {
|
||||
if v.nm != nil {
|
||||
v.nm.Close()
|
||||
v.nm = nil
|
||||
}
|
||||
defer func() {
|
||||
if v.tmpNm != nil {
|
||||
v.tmpNm.Close()
|
||||
v.tmpNm = nil
|
||||
}
|
||||
}()
|
||||
levelDbFile := v.FileName(".ldb")
|
||||
m.indexFile = indexFile
|
||||
err := os.RemoveAll(levelDbFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = os.Rename(v.FileName(".cpldb"), levelDbFile); err != nil {
|
||||
return fmt.Errorf("rename %s: %v", levelDbFile, err)
|
||||
}
|
||||
|
||||
db, err := leveldb.OpenFile(levelDbFile, opts)
|
||||
if err != nil {
|
||||
if errors.IsCorrupted(err) {
|
||||
db, err = leveldb.RecoverFile(levelDbFile, opts)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.db = db
|
||||
|
||||
stat, e := indexFile.Stat()
|
||||
if e != nil {
|
||||
glog.Fatalf("stat file %s: %v", indexFile.Name(), e)
|
||||
return e
|
||||
}
|
||||
m.indexFileOffset = stat.Size()
|
||||
m.recordCount = uint64(stat.Size() / types.NeedleMapEntrySize)
|
||||
|
||||
//set watermark
|
||||
watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
|
||||
err = setWatermark(db, uint64(watermark))
|
||||
if err != nil {
|
||||
glog.Fatalf("setting watermark failed %s: %v", indexFile.Name(), err)
|
||||
return err
|
||||
}
|
||||
v.nm = m
|
||||
v.tmpNm = nil
|
||||
return e
|
||||
}
|
||||
|
||||
func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) {
|
||||
glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name())
|
||||
dbFileName := v.FileName(".cpldb")
|
||||
db, dbErr := leveldb.OpenFile(dbFileName, nil)
|
||||
defer func() {
|
||||
if dbErr == nil {
|
||||
db.Close()
|
||||
}
|
||||
if err != nil {
|
||||
os.RemoveAll(dbFileName)
|
||||
}
|
||||
|
||||
}()
|
||||
if dbErr != nil {
|
||||
if errors.IsCorrupted(err) {
|
||||
db, dbErr = leveldb.RecoverFile(dbFileName, nil)
|
||||
}
|
||||
if dbErr != nil {
|
||||
return dbErr
|
||||
}
|
||||
}
|
||||
|
||||
err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
|
||||
if !offset.IsZero() && size.IsValid() {
|
||||
e = levelDbWrite(db, key, offset, size, false, 0)
|
||||
} else {
|
||||
e = levelDbDelete(db, key)
|
||||
}
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if startFrom != 0 {
|
||||
return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LevelDbNeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error {
|
||||
return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user