support read

This commit is contained in:
chrislu
2022-02-13 22:50:44 -08:00
parent f3c1e00521
commit 2b955c1713
18 changed files with 1257 additions and 24 deletions

View File

@@ -0,0 +1,115 @@
package page_writer
import "math"
// ChunkWrittenInterval mark one written interval within one page chunk
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
func (interval *ChunkWrittenInterval) Size() int64 {
return interval.stopOffset - interval.StartOffset
}
func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
return interval.stopOffset-interval.StartOffset == chunkSize
}
// ChunkWrittenIntervalList mark written intervals within one page chunk
type ChunkWrittenIntervalList struct {
head *ChunkWrittenInterval
tail *ChunkWrittenInterval
}
func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
list := &ChunkWrittenIntervalList{
head: &ChunkWrittenInterval{
StartOffset: -1,
stopOffset: -1,
},
tail: &ChunkWrittenInterval{
StartOffset: math.MaxInt64,
stopOffset: math.MaxInt64,
},
}
list.head.next = list.tail
list.tail.prev = list.head
return list
}
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
interval := &ChunkWrittenInterval{
StartOffset: startOffset,
stopOffset: stopOffset,
}
list.addInterval(interval)
}
func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
return list.size() == 1 && list.head.next.isComplete(chunkSize)
}
func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
for t := list.head; t != nil; t = t.next {
writtenByteCount += t.Size()
}
return
}
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
p := list.head
for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
}
if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together
p.stopOffset = q.stopOffset
unlinkNodesBetween(p, q.next)
return
}
if interval.StartOffset <= p.stopOffset {
// merge new interval into p
p.stopOffset = interval.stopOffset
unlinkNodesBetween(p, q)
return
}
if q.StartOffset <= interval.stopOffset {
// merge new interval into q
q.StartOffset = interval.StartOffset
unlinkNodesBetween(p, q)
return
}
// add the new interval between p and q
unlinkNodesBetween(p, q)
p.next = interval
interval.prev = p
q.prev = interval
interval.next = q
}
// unlinkNodesBetween remove all nodes after start and before stop, exclusive
func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
if start.next == stop {
return
}
start.next.prev = nil
start.next = stop
stop.prev.next = nil
stop.prev = start
}
func (list *ChunkWrittenIntervalList) size() int {
var count int
for t := list.head; t != nil; t = t.next {
count++
}
return count - 2
}

View File

@@ -0,0 +1,49 @@
package page_writer
import (
"github.com/stretchr/testify/assert"
"testing"
)
func Test_PageChunkWrittenIntervalList(t *testing.T) {
list := newChunkWrittenIntervalList()
assert.Equal(t, 0, list.size(), "empty list")
list.MarkWritten(0, 5)
assert.Equal(t, 1, list.size(), "one interval")
list.MarkWritten(0, 5)
assert.Equal(t, 1, list.size(), "duplicated interval2")
list.MarkWritten(95, 100)
assert.Equal(t, 2, list.size(), "two intervals")
list.MarkWritten(50, 60)
assert.Equal(t, 3, list.size(), "three intervals")
list.MarkWritten(50, 55)
assert.Equal(t, 3, list.size(), "three intervals merge")
list.MarkWritten(40, 50)
assert.Equal(t, 3, list.size(), "three intervals grow forward")
list.MarkWritten(50, 65)
assert.Equal(t, 3, list.size(), "three intervals grow backward")
list.MarkWritten(70, 80)
assert.Equal(t, 4, list.size(), "four intervals")
list.MarkWritten(60, 70)
assert.Equal(t, 3, list.size(), "three intervals merged")
list.MarkWritten(59, 71)
assert.Equal(t, 3, list.size(), "covered three intervals")
list.MarkWritten(5, 59)
assert.Equal(t, 2, list.size(), "covered two intervals")
list.MarkWritten(70, 99)
assert.Equal(t, 1, list.size(), "covered one intervals")
}

View File

@@ -0,0 +1,30 @@
package page_writer
type DirtyPages interface {
AddPage(offset int64, data []byte)
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string)
Destroy()
LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64)
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}
func minInt(x, y int) int {
if x < y {
return x
}
return y
}

View File

@@ -0,0 +1,16 @@
package page_writer
import (
"io"
)
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
type PageChunk interface {
FreeResource()
WriteDataAt(src []byte, offset int64) (n int)
ReadDataAt(p []byte, off int64) (maxStop int64)
IsComplete() bool
WrittenSize() int64
SaveContent(saveFn SaveToStorageFunc)
}

View File

@@ -0,0 +1,69 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
)
var (
_ = PageChunk(&MemChunk{})
)
type MemChunk struct {
buf []byte
usage *ChunkWrittenIntervalList
chunkSize int64
logicChunkIndex LogicChunkIndex
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
return &MemChunk{
logicChunkIndex: logicChunkIndex,
chunkSize: chunkSize,
buf: mem.Allocate(int(chunkSize)),
usage: newChunkWrittenIntervalList(),
}
}
func (mc *MemChunk) FreeResource() {
mem.Free(mc.buf)
}
func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src)
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
return
}
func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
}
}
return
}
func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize)
}
func (mc *MemChunk) WrittenSize() int64 {
return mc.usage.WrittenSize()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
})
}
}

View File

@@ -0,0 +1,121 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"os"
)
var (
_ = PageChunk(&SwapFileChunk{})
)
type ActualChunkIndex int
type SwapFile struct {
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkSize int64
}
type SwapFileChunk struct {
swapfile *SwapFile
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
dir: dir,
file: nil,
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
if sf.file != nil {
sf.file.Close()
os.Remove(sf.file.Name())
}
}
func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
if err != nil {
glog.Errorf("create swap file: %v", err)
return nil
}
}
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found {
actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
}
return &SwapFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
}
}
func (sc *SwapFileChunk) FreeResource() {
}
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
innerOffset := offset % sc.swapfile.chunkSize
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
if err == nil {
sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
} else {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
return
}
func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break
}
maxStop = max(maxStop, logicStop)
}
}
return
}
func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
func (sc *SwapFileChunk) WrittenSize() int64 {
return sc.usage.WrittenSize()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
reader := util.NewBytesReader(data)
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
})
mem.Free(data)
}
sc.usage = newChunkWrittenIntervalList()
}

View File

@@ -0,0 +1,182 @@
package page_writer
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"
)
type LogicChunkIndex int
type UploadPipeline struct {
filepath util.FullPath
ChunkSize int64
writableChunks map[LogicChunkIndex]PageChunk
writableChunksLock sync.Mutex
sealedChunks map[LogicChunkIndex]*SealedChunk
sealedChunksLock sync.Mutex
uploaders *util.LimitedConcurrentExecutor
uploaderCount int32
uploaderCountCond *sync.Cond
saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
bufferChunkLimit int
}
type SealedChunk struct {
chunk PageChunk
referenceCounter int // track uploading or reading processes
}
func (sc *SealedChunk) FreeReference(messageOnFree string) {
sc.referenceCounter--
if sc.referenceCounter == 0 {
glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
sc.chunk.FreeResource()
}
}
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
return &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int),
bufferChunkLimit: bufferChunkLimit,
}
}
func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
memChunk, found := up.writableChunks[logicChunkIndex]
if !found {
if len(up.writableChunks) < up.bufferChunkLimit {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize()
if fullness < chunkFullness {
fullestChunkIndex = lci
fullness = chunkFullness
}
}
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
delete(up.writableChunks, fullestChunkIndex)
fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
}
up.writableChunks[logicChunkIndex] = memChunk
}
n = memChunk.WriteDataAt(p, off)
up.maybeMoveToSealed(memChunk, logicChunkIndex)
return
}
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
// read from sealed chunks first
up.sealedChunksLock.Lock()
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found {
sealedChunk.referenceCounter++
}
up.sealedChunksLock.Unlock()
if found {
maxStop = sealedChunk.chunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
}
// read from writable chunks last
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
writableChunk, found := up.writableChunks[logicChunkIndex]
if !found {
return
}
writableMaxStop := writableChunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)
return
}
func (up *UploadPipeline) FlushAll() {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
for logicChunkIndex, memChunk := range up.writableChunks {
up.moveToSealed(memChunk, logicChunkIndex)
}
up.waitForCurrentWritersToComplete()
}
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
if memChunk.IsComplete() {
up.moveToSealed(memChunk, logicChunkIndex)
}
}
func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
atomic.AddInt32(&up.uploaderCount, 1)
glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
up.sealedChunksLock.Lock()
if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
}
sealedChunk := &SealedChunk{
chunk: memChunk,
referenceCounter: 1, // default 1 is for uploading process
}
up.sealedChunks[logicChunkIndex] = sealedChunk
delete(up.writableChunks, logicChunkIndex)
up.sealedChunksLock.Unlock()
up.uploaders.Execute(func() {
// first add to the file chunks
sealedChunk.chunk.SaveContent(up.saveToStorageFn)
// notify waiting process
atomic.AddInt32(&up.uploaderCount, -1)
glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
// Lock and Unlock are not required,
// but it may signal multiple times during one wakeup,
// and the waiting goroutine may miss some of them!
up.uploaderCountCond.L.Lock()
up.uploaderCountCond.Broadcast()
up.uploaderCountCond.L.Unlock()
// wait for readers
for up.IsLocked(logicChunkIndex) {
time.Sleep(59 * time.Millisecond)
}
// then remove from sealed chunks
up.sealedChunksLock.Lock()
defer up.sealedChunksLock.Unlock()
delete(up.sealedChunks, logicChunkIndex)
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
})
}
func (up *UploadPipeline) Shutdown() {
}

View File

@@ -0,0 +1,63 @@
package page_writer
import (
"sync/atomic"
)
func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
up.activeReadChunks[i] = count + 1
} else {
up.activeReadChunks[i] = 1
}
}
}
func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
if count == 1 {
delete(up.activeReadChunks, i)
} else {
up.activeReadChunks[i] = count - 1
}
}
}
}
func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
if count, found := up.activeReadChunks[logicChunkIndex]; found {
return count > 0
}
return false
}
func (up *UploadPipeline) waitForCurrentWritersToComplete() {
up.uploaderCountCond.L.Lock()
t := int32(100)
for {
t = atomic.LoadInt32(&up.uploaderCount)
if t <= 0 {
break
}
up.uploaderCountCond.Wait()
}
up.uploaderCountCond.L.Unlock()
}

View File

@@ -0,0 +1,47 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"testing"
)
func TestUploadPipeline(t *testing.T) {
uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144)
writeRange(uploadPipeline, 262144, 1025536)
confirmRange(t, uploadPipeline, 0, 1025536)
writeRange(uploadPipeline, 1025536, 1296896)
confirmRange(t, uploadPipeline, 1025536, 1296896)
writeRange(uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
}
// startOff and stopOff must be divided by 4
func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
uploadPipeline.SaveDataAt(p, i)
}
}
func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff; i < stopOff/4; i += 4 {
uploadPipeline.MaybeReadDataAt(p, i)
x := util.BytesToUint32(p)
if x != uint32(i) {
t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
}
}
}