Merge pull request #4 from chrislusf/master
This commit is contained in:
@@ -92,7 +92,7 @@ func main() {
|
|||||||
|
|
||||||
header := superBlock.Bytes()
|
header := superBlock.Bytes()
|
||||||
|
|
||||||
if n, e := datFile.WriteAt(header, 0); n == 0 || e != nil {
|
if n, e := datBackend.WriteAt(header, 0); n == 0 || e != nil {
|
||||||
glog.Fatalf("cannot write super block: %v", e)
|
glog.Fatalf("cannot write super block: %v", e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package backend
|
package backend
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -12,12 +14,25 @@ var (
|
|||||||
type DiskFile struct {
|
type DiskFile struct {
|
||||||
File *os.File
|
File *os.File
|
||||||
fullFilePath string
|
fullFilePath string
|
||||||
|
fileSize int64
|
||||||
|
modTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDiskFile(f *os.File) *DiskFile {
|
func NewDiskFile(f *os.File) *DiskFile {
|
||||||
|
stat, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("stat file %s: %v", f.Name(), err)
|
||||||
|
}
|
||||||
|
offset := stat.Size()
|
||||||
|
if offset%NeedlePaddingSize != 0 {
|
||||||
|
offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
|
||||||
|
}
|
||||||
|
|
||||||
return &DiskFile{
|
return &DiskFile{
|
||||||
fullFilePath: f.Name(),
|
fullFilePath: f.Name(),
|
||||||
File: f,
|
File: f,
|
||||||
|
fileSize: offset,
|
||||||
|
modTime: stat.ModTime(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -26,11 +41,28 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
|
func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
|
||||||
return df.File.WriteAt(p, off)
|
n, err = df.File.WriteAt(p, off)
|
||||||
|
if err == nil {
|
||||||
|
waterMark := off + int64(n)
|
||||||
|
if waterMark > df.fileSize {
|
||||||
|
df.fileSize = waterMark
|
||||||
|
df.modTime = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (df *DiskFile) Append(p []byte) (n int, err error) {
|
||||||
|
return df.WriteAt(p, df.fileSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (df *DiskFile) Truncate(off int64) error {
|
func (df *DiskFile) Truncate(off int64) error {
|
||||||
return df.File.Truncate(off)
|
err := df.File.Truncate(off)
|
||||||
|
if err == nil {
|
||||||
|
df.fileSize = off
|
||||||
|
df.modTime = time.Now()
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (df *DiskFile) Close() error {
|
func (df *DiskFile) Close() error {
|
||||||
@@ -38,11 +70,7 @@ func (df *DiskFile) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
|
func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
|
||||||
stat, e := df.File.Stat()
|
return df.fileSize, df.modTime, nil
|
||||||
if e == nil {
|
|
||||||
return stat.Size(), stat.ModTime(), nil
|
|
||||||
}
|
|
||||||
return 0, time.Time{}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (df *DiskFile) Name() string {
|
func (df *DiskFile) Name() string {
|
||||||
|
|||||||
@@ -161,7 +161,15 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
|
|||||||
dataSize := GetActualSize(size, version)
|
dataSize := GetActualSize(size, version)
|
||||||
dataSlice = make([]byte, int(dataSize))
|
dataSlice = make([]byte, int(dataSize))
|
||||||
|
|
||||||
_, err = r.ReadAt(dataSlice, offset)
|
var n int
|
||||||
|
n, err = r.ReadAt(dataSlice, offset)
|
||||||
|
if err != nil && int64(n) == dataSize {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
fileSize, _, _ := r.GetStat()
|
||||||
|
println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
|
||||||
|
}
|
||||||
return dataSlice, err
|
return dataSlice, err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ func TestAppend(t *testing.T) {
|
|||||||
int64 : -9223372036854775808 to 9223372036854775807
|
int64 : -9223372036854775808 to 9223372036854775807
|
||||||
*/
|
*/
|
||||||
|
|
||||||
fileSize := int64(4294967295) + 10000
|
fileSize := int64(4294967296) + 10000
|
||||||
tempFile.Truncate(fileSize)
|
tempFile.Truncate(fileSize)
|
||||||
defer func() {
|
defer func() {
|
||||||
tempFile.Close()
|
tempFile.Close()
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -41,6 +40,7 @@ type baseNeedleMapper struct {
|
|||||||
|
|
||||||
indexFile *os.File
|
indexFile *os.File
|
||||||
indexFileAccessLock sync.Mutex
|
indexFileAccessLock sync.Mutex
|
||||||
|
indexFileOffset int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nm *baseNeedleMapper) IndexFileSize() uint64 {
|
func (nm *baseNeedleMapper) IndexFileSize() uint64 {
|
||||||
@@ -56,11 +56,10 @@ func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size
|
|||||||
|
|
||||||
nm.indexFileAccessLock.Lock()
|
nm.indexFileAccessLock.Lock()
|
||||||
defer nm.indexFileAccessLock.Unlock()
|
defer nm.indexFileAccessLock.Unlock()
|
||||||
if _, err := nm.indexFile.Seek(0, 2); err != nil {
|
written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset)
|
||||||
return fmt.Errorf("cannot seek end of indexfile %s: %v",
|
if err == nil {
|
||||||
nm.indexFile.Name(), err)
|
nm.indexFileOffset += int64(written)
|
||||||
}
|
}
|
||||||
_, err := nm.indexFile.Write(bytes)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,11 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
|
|||||||
generateLevelDbFile(dbFileName, indexFile)
|
generateLevelDbFile(dbFileName, indexFile)
|
||||||
glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
|
glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
|
||||||
}
|
}
|
||||||
|
if stat, err := indexFile.Stat(); err != nil {
|
||||||
|
glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
|
||||||
|
} else {
|
||||||
|
m.indexFileOffset = stat.Size()
|
||||||
|
}
|
||||||
glog.V(1).Infof("Opening %s...", dbFileName)
|
glog.V(1).Infof("Opening %s...", dbFileName)
|
||||||
|
|
||||||
if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
|
if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
|
||||||
|
|||||||
@@ -19,6 +19,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap {
|
|||||||
m: needle_map.NewCompactMap(),
|
m: needle_map.NewCompactMap(),
|
||||||
}
|
}
|
||||||
nm.indexFile = file
|
nm.indexFile = file
|
||||||
|
stat, err := file.Stat()
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("stat file %s: %v", file.Name(), err)
|
||||||
|
}
|
||||||
|
nm.indexFileOffset = stat.Size()
|
||||||
return nm
|
return nm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -286,7 +286,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
|
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
|
||||||
}
|
}
|
||||||
dst.Write(needleBytes)
|
dstDatBackend.Append(needleBytes)
|
||||||
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
|
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
|
||||||
} else { //deleted needle
|
} else { //deleted needle
|
||||||
//fakeDelNeedle 's default Data field is nil
|
//fakeDelNeedle 's default Data field is nil
|
||||||
|
|||||||
Reference in New Issue
Block a user