refactoring
This commit is contained in:
@@ -3,9 +3,10 @@ package operation
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||||
@@ -72,7 +73,7 @@ func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.Dia
|
|||||||
|
|
||||||
n := new(needle.Needle)
|
n := new(needle.Needle)
|
||||||
n.ParseNeedleHeader(needleHeader)
|
n.ParseNeedleHeader(needleHeader)
|
||||||
err = n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
|
err = n.ReadNeedleBodyBytes(needleBody, needle.GetCurrentVersion())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
|
|||||||
Rack: req.Rack,
|
Rack: req.Rack,
|
||||||
DataNode: req.DataNode,
|
DataNode: req.DataNode,
|
||||||
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
|
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
|
||||||
Version: uint32(needle.CurrentVersion),
|
Version: uint32(needle.GetCurrentVersion()),
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ms.Topo.DataCenterExists(option.DataCenter) {
|
if !ms.Topo.DataCenterExists(option.DataCenter) {
|
||||||
|
|||||||
@@ -343,7 +343,7 @@ func (ms *MasterServer) VolumeGrow(ctx context.Context, req *master_pb.VolumeGro
|
|||||||
Rack: req.Rack,
|
Rack: req.Rack,
|
||||||
DataNode: req.DataNode,
|
DataNode: req.DataNode,
|
||||||
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
|
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
|
||||||
Version: uint32(needle.CurrentVersion),
|
Version: uint32(needle.GetCurrentVersion()),
|
||||||
}
|
}
|
||||||
volumeGrowRequest := topology.VolumeGrowRequest{
|
volumeGrowRequest := topology.VolumeGrowRequest{
|
||||||
Option: &volumeGrowOption,
|
Option: &volumeGrowOption,
|
||||||
|
|||||||
@@ -177,7 +177,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
|
|||||||
Rack: r.FormValue("rack"),
|
Rack: r.FormValue("rack"),
|
||||||
DataNode: r.FormValue("dataNode"),
|
DataNode: r.FormValue("dataNode"),
|
||||||
MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
|
MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
|
||||||
Version: uint32(needle.CurrentVersion),
|
Version: uint32(needle.GetCurrentVersion()),
|
||||||
}
|
}
|
||||||
return volumeGrowOption, nil
|
return volumeGrowOption, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ func TestPositioning(t *testing.T) {
|
|||||||
fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
|
fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
|
||||||
|
|
||||||
var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
|
var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
|
||||||
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
|
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.GetCurrentVersion())))
|
||||||
|
|
||||||
for _, interval := range intervals {
|
for _, interval := range intervals {
|
||||||
shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
|
shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ func TestAppend(t *testing.T) {
|
|||||||
datBackend := backend.NewDiskFile(tempFile)
|
datBackend := backend.NewDiskFile(tempFile)
|
||||||
defer datBackend.Close()
|
defer datBackend.Close()
|
||||||
|
|
||||||
offset, _, _, _ := n.Append(datBackend, CurrentVersion)
|
offset, _, _, _ := n.Append(datBackend, GetCurrentVersion())
|
||||||
if offset != uint64(fileSize) {
|
if offset != uint64(fileSize) {
|
||||||
t.Errorf("Fail to Append Needle.")
|
t.Errorf("Fail to Append Needle.")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,8 +3,11 @@ package needle
|
|||||||
type Version uint8
|
type Version uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Version1 = Version(1)
|
Version1 = Version(1)
|
||||||
Version2 = Version(2)
|
Version2 = Version(2)
|
||||||
Version3 = Version(3)
|
Version3 = Version(3)
|
||||||
CurrentVersion = Version3
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func GetCurrentVersion() Version {
|
||||||
|
return Version3
|
||||||
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ func TestSuperBlockReadWrite(t *testing.T) {
|
|||||||
rp, _ := NewReplicaPlacementFromByte(byte(001))
|
rp, _ := NewReplicaPlacementFromByte(byte(001))
|
||||||
ttl, _ := needle.ReadTTL("15d")
|
ttl, _ := needle.ReadTTL("15d")
|
||||||
s := &SuperBlock{
|
s := &SuperBlock{
|
||||||
Version: needle.CurrentVersion,
|
Version: needle.GetCurrentVersion(),
|
||||||
ReplicaPlacement: rp,
|
ReplicaPlacement: rp,
|
||||||
Ttl: ttl,
|
Ttl: ttl,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
|
|||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
if datSize == 0 {
|
if datSize == 0 {
|
||||||
v.SuperBlock.Version = needle.CurrentVersion
|
v.SuperBlock.Version = needle.GetCurrentVersion()
|
||||||
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
|
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
|
||||||
if e != nil && os.IsPermission(e) {
|
if e != nil && os.IsPermission(e) {
|
||||||
//read-only, but zero length - recreate it!
|
//read-only, but zero length - recreate it!
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package storage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||||
@@ -10,7 +12,6 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
|
func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
|
||||||
@@ -23,7 +24,7 @@ func (v *Volume) maybeLoadVolumeInfo() (found bool) {
|
|||||||
v.volumeInfo, v.hasRemoteFile, found, err = volume_info.MaybeLoadVolumeInfo(v.FileName(".vif"))
|
v.volumeInfo, v.hasRemoteFile, found, err = volume_info.MaybeLoadVolumeInfo(v.FileName(".vif"))
|
||||||
|
|
||||||
if v.volumeInfo.Version == 0 {
|
if v.volumeInfo.Version == 0 {
|
||||||
v.volumeInfo.Version = uint32(needle.CurrentVersion)
|
v.volumeInfo.Version = uint32(needle.GetCurrentVersion())
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.hasRemoteFile {
|
if v.hasRemoteFile {
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
|||||||
DeletedByteCount: 34524,
|
DeletedByteCount: 34524,
|
||||||
ReadOnly: false,
|
ReadOnly: false,
|
||||||
ReplicaPlacement: uint32(0),
|
ReplicaPlacement: uint32(0),
|
||||||
Version: uint32(needle.CurrentVersion),
|
Version: uint32(needle.GetCurrentVersion()),
|
||||||
Ttl: 0,
|
Ttl: 0,
|
||||||
}
|
}
|
||||||
volumeMessages = append(volumeMessages, volumeMessage)
|
volumeMessages = append(volumeMessages, volumeMessage)
|
||||||
@@ -65,7 +65,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
|||||||
DeletedByteCount: 34524,
|
DeletedByteCount: 34524,
|
||||||
ReadOnly: false,
|
ReadOnly: false,
|
||||||
ReplicaPlacement: uint32(0),
|
ReplicaPlacement: uint32(0),
|
||||||
Version: uint32(needle.CurrentVersion),
|
Version: uint32(needle.GetCurrentVersion()),
|
||||||
Ttl: 0,
|
Ttl: 0,
|
||||||
DiskType: "ssd",
|
DiskType: "ssd",
|
||||||
}
|
}
|
||||||
@@ -94,7 +94,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
|||||||
DeletedByteCount: 345240,
|
DeletedByteCount: 345240,
|
||||||
ReadOnly: false,
|
ReadOnly: false,
|
||||||
ReplicaPlacement: uint32(0),
|
ReplicaPlacement: uint32(0),
|
||||||
Version: uint32(needle.CurrentVersion),
|
Version: uint32(needle.GetCurrentVersion()),
|
||||||
Ttl: 0,
|
Ttl: 0,
|
||||||
}
|
}
|
||||||
volumeMessages = append(volumeMessages, volumeMessage)
|
volumeMessages = append(volumeMessages, volumeMessage)
|
||||||
@@ -117,7 +117,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
|||||||
Id: uint32(3),
|
Id: uint32(3),
|
||||||
Collection: "",
|
Collection: "",
|
||||||
ReplicaPlacement: uint32(0),
|
ReplicaPlacement: uint32(0),
|
||||||
Version: uint32(needle.CurrentVersion),
|
Version: uint32(needle.GetCurrentVersion()),
|
||||||
Ttl: 0,
|
Ttl: 0,
|
||||||
}
|
}
|
||||||
topo.IncrementalSyncDataNodeRegistration(
|
topo.IncrementalSyncDataNodeRegistration(
|
||||||
@@ -191,7 +191,7 @@ func TestAddRemoveVolume(t *testing.T) {
|
|||||||
DeleteCount: 23,
|
DeleteCount: 23,
|
||||||
DeletedByteCount: 45,
|
DeletedByteCount: 45,
|
||||||
ReadOnly: false,
|
ReadOnly: false,
|
||||||
Version: needle.CurrentVersion,
|
Version: needle.GetCurrentVersion(),
|
||||||
ReplicaPlacement: &super_block.ReplicaPlacement{},
|
ReplicaPlacement: &super_block.ReplicaPlacement{},
|
||||||
Ttl: needle.EMPTY_TTL,
|
Ttl: needle.EMPTY_TTL,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -264,7 +264,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid
|
|||||||
Collection: option.Collection,
|
Collection: option.Collection,
|
||||||
ReplicaPlacement: option.ReplicaPlacement,
|
ReplicaPlacement: option.ReplicaPlacement,
|
||||||
Ttl: option.Ttl,
|
Ttl: option.Ttl,
|
||||||
Version: needle.CurrentVersion,
|
Version: needle.GetCurrentVersion(),
|
||||||
DiskType: option.DiskType.String(),
|
DiskType: option.DiskType.String(),
|
||||||
ModifiedAtSecond: time.Now().Unix(),
|
ModifiedAtSecond: time.Now().Unix(),
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -3,9 +3,10 @@ package topology
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||||
@@ -104,7 +105,7 @@ func setup(topologyLayout string) *Topology {
|
|||||||
vi := storage.VolumeInfo{
|
vi := storage.VolumeInfo{
|
||||||
Id: needle.VolumeId(int64(m["id"].(float64))),
|
Id: needle.VolumeId(int64(m["id"].(float64))),
|
||||||
Size: uint64(m["size"].(float64)),
|
Size: uint64(m["size"].(float64)),
|
||||||
Version: needle.CurrentVersion,
|
Version: needle.GetCurrentVersion(),
|
||||||
}
|
}
|
||||||
if mVal, ok := m["collection"]; ok {
|
if mVal, ok := m["collection"]; ok {
|
||||||
vi.Collection = mVal.(string)
|
vi.Collection = mVal.(string)
|
||||||
|
|||||||
@@ -265,7 +265,7 @@ func (vl *VolumeLayout) isCrowdedVolume(v *storage.VolumeInfo) bool {
|
|||||||
|
|
||||||
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
||||||
return !vl.isOversized(v) &&
|
return !vl.isOversized(v) &&
|
||||||
v.Version == needle.CurrentVersion &&
|
v.Version == needle.GetCurrentVersion() &&
|
||||||
!v.ReadOnly
|
!v.ReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user