ec: generate and copy .vif file
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/idx"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
@@ -56,6 +57,12 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
|
||||
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
|
||||
}
|
||||
|
||||
// read volume info
|
||||
ev.Version = needle.Version3
|
||||
if volumeInfo, found := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
|
||||
ev.Version = needle.Version(volumeInfo.Version)
|
||||
}
|
||||
|
||||
ev.ShardLocations = make(map[ShardId][]string)
|
||||
|
||||
return
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/stats"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
@@ -121,20 +120,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
|
||||
for _, location := range s.Locations {
|
||||
if localEcVolume, found := location.FindEcVolume(vid); found {
|
||||
|
||||
// read the volume version
|
||||
readCounter := 0
|
||||
for localEcVolume.Version == 0 {
|
||||
err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
|
||||
readCounter++
|
||||
if readCounter > 10 && err != nil {
|
||||
return 0, fmt.Errorf("fail to read ec volume %d: %v", vid, err)
|
||||
}
|
||||
time.Sleep(1357 * time.Millisecond)
|
||||
glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
|
||||
}
|
||||
version := localEcVolume.Version
|
||||
|
||||
offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, version)
|
||||
offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("locate in local ec volume: %v", err)
|
||||
}
|
||||
@@ -155,7 +141,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
|
||||
return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
|
||||
}
|
||||
|
||||
err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
|
||||
err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("readbytes: %v", err)
|
||||
}
|
||||
@@ -166,22 +152,6 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
|
||||
return 0, fmt.Errorf("ec shard %d not found", vid)
|
||||
}
|
||||
|
||||
func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
|
||||
|
||||
interval := erasure_coding.Interval{
|
||||
BlockIndex: 0,
|
||||
InnerBlockOffset: 0,
|
||||
Size: super_block.SuperBlockSize,
|
||||
IsLargeBlock: true, // it could be large block, but ok in this place
|
||||
LargeBlockRowsCount: 0,
|
||||
}
|
||||
data, _, err := s.readEcShardIntervals(ctx, vid, 0, ecVolume, []erasure_coding.Interval{interval})
|
||||
if err == nil {
|
||||
ecVolume.Version = needle.Version(data[0])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
|
||||
if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user