* fix LevelDB panic on lazy reload Implemented a thread-safe reload mechanism using double-checked locking and a retry loop in Get, Put, and Delete. Added a concurrency test to verify the fix and prevent regressions. Fixes #8269 * refactor: use helper for leveldb fix and remove deprecated ioutil * fix: prevent deadlock by using getFromDb helper Extracted DB lookup to internal helper to avoid recursive RLock in Put/Delete methods. Updated Get to use the helper as well. * fix: resolve syntax error and commit deadlock prevention Fixed a duplicate function declaration syntax error. Verified that getFromDb helper correctly prevents recursive RLock scenarios. * refactor: remove redundant timeout checks Removed nested `if m.ldbTimeout > 0` checks in Get, Put, and Delete methods as suggested in PR review.
This commit is contained in:
@@ -132,15 +132,17 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
|
func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
|
||||||
bytes := make([]byte, NeedleIdSize)
|
|
||||||
if m.ldbTimeout > 0 {
|
if m.ldbTimeout > 0 {
|
||||||
m.ldbAccessLock.RLock()
|
if err := m.ensureLdbLoaded(); err != nil {
|
||||||
defer m.ldbAccessLock.RUnlock()
|
|
||||||
loadErr := reloadLdb(m)
|
|
||||||
if loadErr != nil {
|
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
defer m.ldbAccessLock.RUnlock()
|
||||||
}
|
}
|
||||||
|
return m.getFromDb(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LevelDbNeedleMap) getFromDb(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
|
||||||
|
bytes := make([]byte, NeedleIdSize)
|
||||||
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
|
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
|
||||||
data, err := m.db.Get(bytes, nil)
|
data, err := m.db.Get(bytes, nil)
|
||||||
if err != nil || len(data) != OffsetSize+SizeSize {
|
if err != nil || len(data) != OffsetSize+SizeSize {
|
||||||
@@ -155,14 +157,12 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
|
|||||||
var oldSize Size
|
var oldSize Size
|
||||||
var watermark uint64
|
var watermark uint64
|
||||||
if m.ldbTimeout > 0 {
|
if m.ldbTimeout > 0 {
|
||||||
m.ldbAccessLock.RLock()
|
if err := m.ensureLdbLoaded(); err != nil {
|
||||||
defer m.ldbAccessLock.RUnlock()
|
return err
|
||||||
loadErr := reloadLdb(m)
|
|
||||||
if loadErr != nil {
|
|
||||||
return loadErr
|
|
||||||
}
|
}
|
||||||
|
defer m.ldbAccessLock.RUnlock()
|
||||||
}
|
}
|
||||||
if oldNeedle, ok := m.Get(key); ok {
|
if oldNeedle, ok := m.getFromDb(key); ok {
|
||||||
oldSize = oldNeedle.Size
|
oldSize = oldNeedle.Size
|
||||||
}
|
}
|
||||||
m.logPut(key, oldSize, size)
|
m.logPut(key, oldSize, size)
|
||||||
@@ -222,14 +222,12 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
|
|||||||
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
|
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
|
||||||
var watermark uint64
|
var watermark uint64
|
||||||
if m.ldbTimeout > 0 {
|
if m.ldbTimeout > 0 {
|
||||||
m.ldbAccessLock.RLock()
|
if err := m.ensureLdbLoaded(); err != nil {
|
||||||
defer m.ldbAccessLock.RUnlock()
|
return err
|
||||||
loadErr := reloadLdb(m)
|
|
||||||
if loadErr != nil {
|
|
||||||
return loadErr
|
|
||||||
}
|
}
|
||||||
|
defer m.ldbAccessLock.RUnlock()
|
||||||
}
|
}
|
||||||
oldNeedle, found := m.Get(key)
|
oldNeedle, found := m.getFromDb(key)
|
||||||
if !found || oldNeedle.Size.IsDeleted() {
|
if !found || oldNeedle.Size.IsDeleted() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -400,6 +398,24 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *LevelDbNeedleMap) ensureLdbLoaded() error {
|
||||||
|
for {
|
||||||
|
m.ldbAccessLock.RLock()
|
||||||
|
if m.db != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
m.ldbAccessLock.RUnlock()
|
||||||
|
m.ldbAccessLock.Lock()
|
||||||
|
if m.db == nil {
|
||||||
|
if err := reloadLdb(m); err != nil {
|
||||||
|
m.ldbAccessLock.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.ldbAccessLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func reloadLdb(m *LevelDbNeedleMap) (err error) {
|
func reloadLdb(m *LevelDbNeedleMap) (err error) {
|
||||||
if m.db != nil {
|
if m.db != nil {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
90
weed/storage/needle_map_leveldb_test.go
Normal file
90
weed/storage/needle_map_leveldb_test.go
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLevelDbNeedleMap_Concurrency(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp("", "test_leveldb_concurrency")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
prefix := "test"
|
||||||
|
indexFile, err := os.Create(filepath.Join(dir, prefix+".idx"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create index file: %v", err)
|
||||||
|
}
|
||||||
|
dbFileName := filepath.Join(dir, prefix+".ldb")
|
||||||
|
|
||||||
|
// Create and initialize map
|
||||||
|
m, err := NewLevelDbNeedleMap(dbFileName, indexFile, nil, 1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewLevelDbNeedleMap: %v", err)
|
||||||
|
}
|
||||||
|
defer m.Close()
|
||||||
|
|
||||||
|
// Pre-populate some data
|
||||||
|
key := types.NeedleId(1)
|
||||||
|
if err := m.Put(key, types.ToOffset(100), types.Size(200)); err != nil {
|
||||||
|
t.Fatalf("Put: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force unload to start from nil state
|
||||||
|
if err := unloadLdb(m); err != nil {
|
||||||
|
t.Fatalf("unloadLdb: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
startCh := make(chan struct{})
|
||||||
|
errCh := make(chan error, 100)
|
||||||
|
|
||||||
|
// Spawn multiple goroutines to trigger the race
|
||||||
|
// Multiple readers will see m.db == nil and try to reload concurrently
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
<-startCh
|
||||||
|
|
||||||
|
// Try multiple times to increase chance of collision
|
||||||
|
for j := 0; j < 2; j++ {
|
||||||
|
_, ok := m.Get(key)
|
||||||
|
if !ok {
|
||||||
|
// Get failed, possibly due to race in reload.
|
||||||
|
// But we also put data concurrently, so maybe it's missing if deleted?
|
||||||
|
// In this test, we only Put, never Delete. So Key 1 should be there.
|
||||||
|
// However, if DB reload fails, Get returns false!
|
||||||
|
errCh <- fmt.Errorf("routine %d iter %d: Get returned false", id, j)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also try Put concurrently
|
||||||
|
err := m.Put(types.NeedleId(2+id), types.ToOffset(100), types.Size(200))
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("routine %d iter %d: Put failed: %v", id, j, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manually unload occasionally to reset the state
|
||||||
|
if j%2 == 0 {
|
||||||
|
// This might fail if locked, but that's fine
|
||||||
|
unloadLdb(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(startCh)
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
|
||||||
|
for e := range errCh {
|
||||||
|
t.Errorf("Error encountered: %v", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user