wait for reading threads to complete before dropping sealed chunks

This commit is contained in:
chrislu
2022-01-17 22:24:44 -08:00
parent 0a3f95ca01
commit f4ad63528a
15 changed files with 106 additions and 1086 deletions

View File

@@ -7,19 +7,24 @@ import (
"github.com/chrislusf/seaweedfs/weed/util/mem"
"sync"
"sync/atomic"
"time"
)
type LogicChunkIndex int
type UploadPipeline struct {
writableChunks map[LogicChunkIndex]*MemChunk
writableChunksLock sync.Mutex
sealedChunks map[LogicChunkIndex]*SealedChunk
sealedChunksLock sync.Mutex
ChunkSize int64
writers *util.LimitedConcurrentExecutor
activeWriterCond *sync.Cond
activeWriterCount int32
saveToStorageFn SaveToStorageFunc
filepath util.FullPath
filepath util.FullPath
ChunkSize int64
writers *util.LimitedConcurrentExecutor
writableChunks map[LogicChunkIndex]*MemChunk
writableChunksLock sync.Mutex
sealedChunks map[LogicChunkIndex]*SealedChunk
sealedChunksLock sync.Mutex
activeWriterCond *sync.Cond
activeWriterCount int32
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
saveToStorageFn SaveToStorageFunc
}
type SealedChunk struct {
@@ -44,6 +49,7 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx
activeWriterCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
filepath: filepath,
activeReadChunks: make(map[LogicChunkIndex]int),
}
}
@@ -110,6 +116,51 @@ func (cw *UploadPipeline) FlushAll() {
cw.waitForCurrentWritersToComplete()
}
func (cw *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize)
if stopOffset%cw.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
cw.activeReadChunksLock.Lock()
defer cw.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := cw.activeReadChunks[i]; found {
cw.activeReadChunks[i] = count + 1
} else {
cw.activeReadChunks[i] = 1
}
}
}
func (cw *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize)
if stopOffset%cw.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
cw.activeReadChunksLock.Lock()
defer cw.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := cw.activeReadChunks[i]; found {
if count == 1 {
delete(cw.activeReadChunks, i)
} else {
cw.activeReadChunks[i] = count - 1
}
}
}
}
func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
cw.activeReadChunksLock.Lock()
defer cw.activeReadChunksLock.Unlock()
if count, found := cw.activeReadChunks[logicChunkIndex]; found {
return count > 0
}
return false
}
func (cw *UploadPipeline) waitForCurrentWritersToComplete() {
cw.activeWriterCond.L.Lock()
t := int32(100)
@@ -152,12 +203,7 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
// first add to the file chunks
cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex)
// then remove from sealed chunks
cw.sealedChunksLock.Lock()
defer cw.sealedChunksLock.Unlock()
delete(cw.sealedChunks, logicChunkIndex)
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex))
// notify waiting process
atomic.AddInt32(&cw.activeWriterCount, -1)
glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount)
// Lock and Unlock are not required,
@@ -166,6 +212,18 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
cw.activeWriterCond.L.Lock()
cw.activeWriterCond.Broadcast()
cw.activeWriterCond.L.Unlock()
// wait for readers
for cw.IsLocked(logicChunkIndex) {
time.Sleep(59 * time.Millisecond)
}
// then remove from sealed chunks
cw.sealedChunksLock.Lock()
defer cw.sealedChunksLock.Unlock()
delete(cw.sealedChunks, logicChunkIndex)
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex))
})
}