shell: add ec.decode command

This commit is contained in:
Chris Lu
2019-12-23 12:48:20 -08:00
parent dda5c6d3cb
commit 09ca936c78
42 changed files with 1052 additions and 396 deletions

View File

@@ -0,0 +1,198 @@
package erasure_coding
import (
"fmt"
"io"
"os"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
// write .idx file from .ecx and .ecj files
func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
}
defer ecxFile.Close()
idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if openErr != nil {
return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr)
}
defer idxFile.Close()
io.Copy(idxFile, ecxFile)
err = iterateEcjFile(baseFileName, func(key types.NeedleId) error {
bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
idxFile.Write(bytes)
return nil
})
return err
}
// FindDatFileSize calculate .dat file size from max offset entry
// there may be extra deletions after that entry
// but they are deletions anyway
func FindDatFileSize(baseFileName string) (datSize int64, err error) {
version, err := readEcVolumeVersion(baseFileName)
if err != nil {
return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
}
err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
if size == types.TombstoneFileSize {
return nil
}
entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
if datSize < entryStopOffset {
datSize = entryStopOffset
}
return nil
})
return
}
func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
// find volume version
datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
if err != nil {
return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
}
datBackend := backend.NewDiskFile(datFile)
superBlock, err := super_block.ReadSuperBlock(datBackend)
datBackend.Close()
if err != nil {
return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
}
return superBlock.Version, nil
}
func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
}
defer ecxFile.Close()
buf := make([]byte, types.NeedleMapEntrySize)
for {
n, err := ecxFile.Read(buf)
if n != types.NeedleMapEntrySize {
if err == io.EOF {
return nil
}
return err
}
key, offset, size := idx.IdxFileEntry(buf)
if processNeedleFn != nil {
err = processNeedleFn(key, offset, size)
}
if err != nil {
if err != io.EOF {
return err
}
return nil
}
}
}
func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
}
defer ecjFile.Close()
buf := make([]byte, types.NeedleIdSize)
for {
n, err := ecjFile.Read(buf)
if n != types.NeedleIdSize {
if err == io.EOF {
return nil
}
return err
}
if processNeedleFn != nil {
err = processNeedleFn(types.BytesToNeedleId(buf))
}
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}
// WriteDatFile generates .dat from from .ec00 ~ .ec09 files
func WriteDatFile(baseFileName string, datFileSize int64) error {
datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if openErr != nil {
return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
}
defer datFile.Close()
inputFiles := make([]*os.File, DataShardsCount)
for shardId := 0; shardId < DataShardsCount; shardId++ {
shardFileName := baseFileName + ToExt(shardId)
inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0)
if openErr != nil {
return openErr
}
defer inputFiles[shardId].Close()
}
for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
for shardId := 0; shardId < DataShardsCount; shardId++ {
w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
if w != ErasureCodingLargeBlockSize {
return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err)
}
datFileSize -= ErasureCodingLargeBlockSize
}
}
for datFileSize > 0 {
for shardId := 0; shardId < DataShardsCount; shardId++ {
toRead := min(datFileSize, ErasureCodingSmallBlockSize)
w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
if w != toRead {
return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
}
datFileSize -= toRead
}
}
return nil
}
func min(x, y int64) int64 {
if x > y {
return y
}
return x
}

View File

@@ -49,7 +49,7 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
return nil
}
// WriteEcFiles generates .ec01 ~ .ec14 files
// WriteEcFiles generates .ec00 ~ .ec13 files
func WriteEcFiles(baseFileName string) error {
return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
}

View File

@@ -81,6 +81,15 @@ func (b ShardBits) ShardIds() (ret []ShardId) {
return
}
func (b ShardBits) ToUint32Slice() (ret []uint32) {
for i := uint32(0); i < TotalShardsCount; i++ {
if b.HasShardId(ShardId(i)) {
ret = append(ret, i)
}
}
return
}
func (b ShardBits) ShardIdCount() (count int) {
for count = 0; b > 0; count++ {
b &= b - 1

View File

@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -61,7 +62,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
@@ -102,7 +103,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
@@ -229,7 +230,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin
err = fmt.Errorf("volume %d is read only", i)
return
}
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.version)) {
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
_, size, isUnchanged, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
@@ -246,7 +247,7 @@ func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32,
if v.noWriteOrDelete {
return 0, fmt.Errorf("volume %d is read only", i)
}
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.version)) {
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) {
return v.deleteNeedle(n)
} else {
return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())

View File

@@ -8,6 +8,8 @@ import (
"sync"
"time"
"github.com/klauspost/reedsolomon"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -15,8 +17,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/klauspost/reedsolomon"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
@@ -169,7 +171,7 @@ func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ec
interval := erasure_coding.Interval{
BlockIndex: 0,
InnerBlockOffset: 0,
Size: _SuperBlockSize,
Size: super_block.SuperBlockSize,
IsLargeBlock: true, // it could be large block, but ok in this place
LargeBlockRowsCount: 0,
}

View File

@@ -1,4 +1,4 @@
package storage
package super_block
import (
"errors"

View File

@@ -1,4 +1,4 @@
package storage
package super_block
import (
"testing"

View File

@@ -0,0 +1,69 @@
package super_block
import (
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
const (
SuperBlockSize = 8
)
/*
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
* Byte 2 and byte 3: Time to live. See TTL for definition
* Byte 4 and byte 5: The number of times the volume has been compacted.
* Rest bytes: Reserved
*/
type SuperBlock struct {
Version needle.Version
ReplicaPlacement *ReplicaPlacement
Ttl *needle.TTL
CompactionRevision uint16
Extra *master_pb.SuperBlockExtra
ExtraSize uint16
}
func (s *SuperBlock) BlockSize() int {
switch s.Version {
case needle.Version2, needle.Version3:
return SuperBlockSize + int(s.ExtraSize)
}
return SuperBlockSize
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.Version)
header[1] = s.ReplicaPlacement.Byte()
s.Ttl.ToBytes(header[2:4])
util.Uint16toBytes(header[4:6], s.CompactionRevision)
if s.Extra != nil {
extraData, err := proto.Marshal(s.Extra)
if err != nil {
glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err)
}
extraSize := len(extraData)
if extraSize > 256*256-2 {
// reserve a couple of bits for future extension
glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2)
}
s.ExtraSize = uint16(extraSize)
util.Uint16toBytes(header[6:8], s.ExtraSize)
header = append(header, extraData...)
}
return header
}
func (s *SuperBlock) Initialized() bool {
return s.ReplicaPlacement != nil && s.Ttl != nil
}

View File

@@ -0,0 +1,44 @@
package super_block
import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
// ReadSuperBlock reads from data file and load it into volume's super block
func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
header := make([]byte, SuperBlockSize)
if _, e := datBackend.ReadAt(header, 0); e != nil {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
return
}
superBlock.Version = needle.Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
return
}
superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4])
superBlock.CompactionRevision = util.BytesToUint16(header[4:6])
superBlock.ExtraSize = util.BytesToUint16(header[6:8])
if superBlock.ExtraSize > 0 {
// read more
extraData := make([]byte, int(superBlock.ExtraSize))
superBlock.Extra = &master_pb.SuperBlockExtra{}
err = proto.Unmarshal(extraData, superBlock.Extra)
if err != nil {
err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.Name(), err)
return
}
}
return
}

View File

@@ -1,4 +1,4 @@
package storage
package super_block
import (
"testing"
@@ -10,7 +10,7 @@ func TestSuperBlockReadWrite(t *testing.T) {
rp, _ := NewReplicaPlacementFromByte(byte(001))
ttl, _ := needle.ReadTTL("15d")
s := &SuperBlock{
version: needle.CurrentVersion,
Version: needle.CurrentVersion,
ReplicaPlacement: rp,
Ttl: ttl,
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -28,7 +29,7 @@ type Volume struct {
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
MemoryMapMaxSizeMb uint32
SuperBlock
super_block.SuperBlock
dataFileAccessLock sync.RWMutex
lastModifiedTsSeconds uint64 //unix time in seconds
@@ -42,10 +43,10 @@ type Volume struct {
volumeTierInfo *volume_server_pb.VolumeTierInfo
}
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind, preallocate)
return
@@ -68,7 +69,7 @@ func (v *Volume) FileName() (fileName string) {
}
func (v *Volume) Version() needle.Version {
return v.SuperBlock.Version()
return v.SuperBlock.Version
}
func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) {

View File

@@ -6,12 +6,14 @@ import (
"io"
"os"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
)
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
@@ -108,7 +110,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
// add to needle map
return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
return ScanVolumeFileFrom(v.Version(), v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
}
@@ -154,11 +156,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset())
n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
if err != nil {
return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
}
_, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
_, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
}
@@ -244,7 +246,7 @@ type VolumeFileScanner4GenIdx struct {
v *Volume
}
func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error {
func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}

View File

@@ -6,12 +6,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
type VolumeInfo struct {
Id needle.VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
Collection string
Version needle.Version
@@ -40,7 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey,
}
rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}
@@ -55,7 +56,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
Collection: m.Collection,
Version: needle.Version(m.Version),
}
rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}

View File

@@ -11,11 +11,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
err = v.load(false, false, needleMapKind, 0)
return
@@ -43,7 +44,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
if fileSize >= _SuperBlockSize {
if fileSize >= super_block.SuperBlockSize {
alreadyHasSuperBlock = true
}
v.DataBackend = backend.NewDiskFile(dataFile)

View File

@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -165,7 +166,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
}
type VolumeFileScanner interface {
VisitSuperBlock(SuperBlock) error
VisitSuperBlock(super_block.SuperBlock) error
ReadNeedleBody() bool
VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
}

View File

@@ -1,79 +1,14 @@
package storage
import (
"fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
const (
_SuperBlockSize = 8
)
/*
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
* Byte 2 and byte 3: Time to live. See TTL for definition
* Byte 4 and byte 5: The number of times the volume has been compacted.
* Rest bytes: Reserved
*/
type SuperBlock struct {
version needle.Version
ReplicaPlacement *ReplicaPlacement
Ttl *needle.TTL
CompactionRevision uint16
Extra *master_pb.SuperBlockExtra
extraSize uint16
}
func (s *SuperBlock) BlockSize() int {
switch s.version {
case needle.Version2, needle.Version3:
return _SuperBlockSize + int(s.extraSize)
}
return _SuperBlockSize
}
func (s *SuperBlock) Version() needle.Version {
return s.version
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, _SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
s.Ttl.ToBytes(header[2:4])
util.Uint16toBytes(header[4:6], s.CompactionRevision)
if s.Extra != nil {
extraData, err := proto.Marshal(s.Extra)
if err != nil {
glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err)
}
extraSize := len(extraData)
if extraSize > 256*256-2 {
// reserve a couple of bits for future extension
glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2)
}
s.extraSize = uint16(extraSize)
util.Uint16toBytes(header[6:8], s.extraSize)
header = append(header, extraData...)
}
return header
}
func (s *SuperBlock) Initialized() bool {
return s.ReplicaPlacement != nil && s.Ttl != nil
}
func (v *Volume) maybeWriteSuperBlock() error {
datSize, _, e := v.DataBackend.GetStat()
@@ -82,7 +17,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
return e
}
if datSize == 0 {
v.SuperBlock.version = needle.CurrentVersion
v.SuperBlock.Version = needle.CurrentVersion
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
@@ -100,38 +35,6 @@ func (v *Volume) maybeWriteSuperBlock() error {
}
func (v *Volume) readSuperBlock() (err error) {
v.SuperBlock, err = ReadSuperBlock(v.DataBackend)
v.SuperBlock, err = super_block.ReadSuperBlock(v.DataBackend)
return err
}
// ReadSuperBlock reads from data file and load it into volume's super block
func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
header := make([]byte, _SuperBlockSize)
if _, e := datBackend.ReadAt(header, 0); e != nil {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
return
}
superBlock.version = needle.Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
return
}
superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4])
superBlock.CompactionRevision = util.BytesToUint16(header[4:6])
superBlock.extraSize = util.BytesToUint16(header[6:8])
if superBlock.extraSize > 0 {
// read more
extraData := make([]byte, int(superBlock.extraSize))
superBlock.Extra = &master_pb.SuperBlockExtra{}
err = proto.Unmarshal(extraData, superBlock.Extra)
if err != nil {
err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.Name(), err)
return
}
}
return
}

View File

@@ -11,6 +11,7 @@ import (
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -133,7 +134,7 @@ func (v *Volume) cleanupCompact() error {
}
func fetchCompactRevisionFromDatFile(datBackend backend.BackendStorageFile) (compactRevision uint16, err error) {
superBlock, err := ReadSuperBlock(datBackend)
superBlock, err := super_block.ReadSuperBlock(datBackend)
if err != nil {
return 0, err
}
@@ -277,8 +278,8 @@ type VolumeFileScanner4Vacuum struct {
writeThrottler *util.WriteThrottler
}
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
scanner.version = superBlock.Version()
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock super_block.SuperBlock) error {
scanner.version = superBlock.Version
superBlock.CompactionRevision++
_, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0)
scanner.newOffset = int64(superBlock.BlockSize())

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -46,7 +47,7 @@ func TestMakeDiff(t *testing.T) {
v := new(Volume)
//lastCompactIndexOffset value is the index file size before step 4
v.lastCompactIndexOffset = 96
v.SuperBlock.version = 0x2
v.SuperBlock.Version = 0x2
/*
err := v.makeupDiff(
"/yourpath/1.cpd",
@@ -68,7 +69,7 @@ func TestCompaction(t *testing.T) {
}
defer os.RemoveAll(dir) // clean up
v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0, 0)
v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}