shell: add command volume.move
This commit is contained in:
@@ -72,29 +72,39 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("master received heartbeat %s", heartbeat.String())
|
||||
message := &master_pb.VolumeLocation{
|
||||
Url: dn.Url(),
|
||||
PublicUrl: dn.PublicUrl,
|
||||
}
|
||||
if len(heartbeat.NewVids) > 0 || len(heartbeat.DeletedVids) > 0 {
|
||||
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
|
||||
// process delta volume ids if exists for fast volume id updates
|
||||
message.NewVids = append(message.NewVids, heartbeat.NewVids...)
|
||||
message.DeletedVids = append(message.DeletedVids, heartbeat.DeletedVids...)
|
||||
for _, volInfo := range heartbeat.NewVolumes{
|
||||
message.NewVids = append(message.NewVids, volInfo.Id)
|
||||
}
|
||||
for _, volInfo := range heartbeat.DeletedVolumes{
|
||||
message.DeletedVids = append(message.DeletedVids, volInfo.Id)
|
||||
}
|
||||
// update master internal volume layouts
|
||||
t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
|
||||
} else {
|
||||
// process heartbeat.Volumes
|
||||
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
|
||||
|
||||
for _, v := range newVolumes {
|
||||
glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
|
||||
message.NewVids = append(message.NewVids, uint32(v.Id))
|
||||
}
|
||||
for _, v := range deletedVolumes {
|
||||
glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
|
||||
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
|
||||
}
|
||||
}
|
||||
|
||||
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
|
||||
ms.clientChansLock.RLock()
|
||||
for _, ch := range ms.clientChans {
|
||||
for host, ch := range ms.clientChans {
|
||||
glog.V(0).Infof("master send to %s: %s", host, message.String())
|
||||
ch <- message
|
||||
}
|
||||
ms.clientChansLock.RUnlock()
|
||||
|
||||
@@ -97,23 +97,30 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
|
||||
|
||||
for {
|
||||
select {
|
||||
case vid := <-vs.store.NewVolumeIdChan:
|
||||
case volumeMessage := <-vs.store.NewVolumesChan:
|
||||
deltaBeat := &master_pb.Heartbeat{
|
||||
NewVids: []uint32{uint32(vid)},
|
||||
NewVolumes:[]*master_pb.VolumeShortInformationMessage{
|
||||
&volumeMessage,
|
||||
},
|
||||
}
|
||||
glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
|
||||
if err = stream.Send(deltaBeat); err != nil {
|
||||
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
|
||||
return "", err
|
||||
}
|
||||
case vid := <-vs.store.DeletedVolumeIdChan:
|
||||
case volumeMessage := <-vs.store.DeletedVolumesChan:
|
||||
deltaBeat := &master_pb.Heartbeat{
|
||||
DeletedVids: []uint32{uint32(vid)},
|
||||
DeletedVolumes:[]*master_pb.VolumeShortInformationMessage{
|
||||
&volumeMessage,
|
||||
},
|
||||
}
|
||||
glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
|
||||
if err = stream.Send(deltaBeat); err != nil {
|
||||
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
|
||||
return "", err
|
||||
}
|
||||
case <-tickChan:
|
||||
glog.V(1).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
|
||||
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
||||
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
|
||||
return "", err
|
||||
|
||||
@@ -19,11 +19,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v != nil {
|
||||
// unmount the volume
|
||||
err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err)
|
||||
}
|
||||
return nil, fmt.Errorf("volume %d already exists", req.VolumeId)
|
||||
}
|
||||
|
||||
location := vs.store.FindFreeLocation()
|
||||
@@ -31,8 +27,6 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
return nil, fmt.Errorf("no space left")
|
||||
}
|
||||
|
||||
volumeFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId))
|
||||
|
||||
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
|
||||
// copy .dat and .idx files
|
||||
// read .idx .dat file size and timestamp
|
||||
@@ -40,8 +34,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
// send .dat file
|
||||
// confirm size and timestamp
|
||||
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
|
||||
datFileName := volumeFileName + ".dat"
|
||||
idxFileName := volumeFileName + ".idx"
|
||||
var volumeFileName, idxFileName, datFileName string
|
||||
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
var err error
|
||||
volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
|
||||
@@ -52,6 +45,8 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
return fmt.Errorf("read volume file status failed, %v", err)
|
||||
}
|
||||
|
||||
volumeFileName = storage.VolumeFileName(volFileInfoResp.Collection, location.Directory, int(req.VolumeId))
|
||||
|
||||
// println("source:", volFileInfoResp.String())
|
||||
|
||||
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
@@ -64,6 +59,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
idxFileName = volumeFileName + ".idx"
|
||||
err = writeToFile(copyFileClient, idxFileName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err)
|
||||
@@ -79,6 +75,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
datFileName = volumeFileName + ".dat"
|
||||
err = writeToFile(copyFileClient, datFileName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err)
|
||||
@@ -86,9 +83,13 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
os.Remove(idxFileName)
|
||||
os.Remove(datFileName)
|
||||
if err != nil && volumeFileName != "" {
|
||||
if idxFileName != "" {
|
||||
os.Remove(idxFileName)
|
||||
}
|
||||
if datFileName != "" {
|
||||
os.Remove(datFileName)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -168,6 +169,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
|
||||
resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
|
||||
resp.FileCount = v.FileCount()
|
||||
resp.CompactionRevision = uint32(v.CompactionRevision)
|
||||
resp.Collection = v.Collection
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
|
||||
func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
@@ -50,7 +52,7 @@ func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stre
|
||||
|
||||
}
|
||||
|
||||
func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
|
||||
func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
|
||||
|
||||
foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
|
||||
if err != nil {
|
||||
@@ -61,7 +63,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *
|
||||
|
||||
if isLastOne {
|
||||
// need to heart beat to the client to ensure the connection health
|
||||
sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true})
|
||||
sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
|
||||
return lastTimestampNs, sendErr
|
||||
}
|
||||
|
||||
@@ -78,7 +80,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *
|
||||
stopOffset = len(needleBody)
|
||||
}
|
||||
|
||||
sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
|
||||
sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
|
||||
NeedleHeader: needleHeader,
|
||||
NeedleBody: needleBody[i:stopOffset],
|
||||
IsLastChunk: isLastChunk,
|
||||
@@ -96,3 +98,21 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeTailReceiverResponse{}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
|
||||
}
|
||||
|
||||
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
||||
|
||||
return resp, operation.TailVolumeFromServer(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.DrainingSeconds), func(n *needle.Needle) error {
|
||||
_, err := vs.store.Write(v.Id, n)
|
||||
return err
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user