Block RPC write operations on volume servers when maintenance mode is enabled (#8115)
* Boostrap persistent state for volume servers. This PR implements logic load/save persistent state information for storages associated with volume servers, and reporting state changes back to masters via heartbeat messages. More work ensues! See https://github.com/seaweedfs/seaweedfs/issues/7977 for details. * Block RPC operations writing to volume servers when maintenance mode is on.
This commit is contained in:
@@ -4994,13 +4994,14 @@ func (*VolumeServerStatusRequest) Descriptor() ([]byte, []int) {
|
||||
}
|
||||
|
||||
type VolumeServerStatusResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
DiskStatuses []*DiskStatus `protobuf:"bytes,1,rep,name=disk_statuses,json=diskStatuses,proto3" json:"disk_statuses,omitempty"`
|
||||
MemoryStatus *MemStatus `protobuf:"bytes,2,opt,name=memory_status,json=memoryStatus,proto3" json:"memory_status,omitempty"`
|
||||
Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"`
|
||||
DataCenter string `protobuf:"bytes,4,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"`
|
||||
Rack string `protobuf:"bytes,5,opt,name=rack,proto3" json:"rack,omitempty"`
|
||||
State *VolumeServerState `protobuf:"bytes,6,opt,name=state,proto3" json:"state,omitempty"`
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
// TODO(issues/7977): add volume server state to response
|
||||
DiskStatuses []*DiskStatus `protobuf:"bytes,1,rep,name=disk_statuses,json=diskStatuses,proto3" json:"disk_statuses,omitempty"`
|
||||
MemoryStatus *MemStatus `protobuf:"bytes,2,opt,name=memory_status,json=memoryStatus,proto3" json:"memory_status,omitempty"`
|
||||
Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"`
|
||||
DataCenter string `protobuf:"bytes,4,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"`
|
||||
Rack string `protobuf:"bytes,5,opt,name=rack,proto3" json:"rack,omitempty"`
|
||||
State *VolumeServerState `protobuf:"bytes,6,opt,name=state,proto3" json:"state,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
@@ -39,9 +39,12 @@ func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.AllocateVolumeResponse{}
|
||||
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
err := vs.store.AddVolume(
|
||||
needle.VolumeId(req.VolumeId),
|
||||
req.Collection,
|
||||
@@ -98,9 +101,12 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeDeleteResponse{}
|
||||
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), req.OnlyEmpty)
|
||||
|
||||
if err != nil {
|
||||
@@ -114,9 +120,12 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeConfigureResponse{}
|
||||
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// check replication format
|
||||
if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
|
||||
resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
|
||||
@@ -154,9 +163,12 @@ func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
|
||||
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
@@ -210,9 +222,12 @@ func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeMarkWritableResponse{}
|
||||
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
|
||||
@@ -11,9 +11,12 @@ import (
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.BatchDeleteResponse{}
|
||||
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
now := uint64(time.Now().Unix())
|
||||
|
||||
for _, fid := range req.FileIds {
|
||||
|
||||
@@ -26,6 +26,9 @@ const BufferSizeLimit = 1024 * 1024 * 2
|
||||
|
||||
// VolumeCopy copy the .idx .dat .vif files, and mount the volume
|
||||
func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v != nil {
|
||||
@@ -446,6 +449,10 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
|
||||
|
||||
// ReceiveFile receives a file stream from client and writes it to storage
|
||||
func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_ReceiveFileServer) error {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var fileInfo *volume_server_pb.ReceiveFileInfo
|
||||
var targetFile *os.File
|
||||
var filePath string
|
||||
|
||||
@@ -40,6 +40,9 @@ Steps to apply erasure coding to .dat .idx files
|
||||
|
||||
// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
|
||||
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
|
||||
|
||||
@@ -131,9 +134,11 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
|
||||
|
||||
// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
|
||||
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
|
||||
|
||||
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
|
||||
|
||||
var rebuiltShardIds []uint32
|
||||
@@ -173,6 +178,9 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
|
||||
|
||||
// VolumeEcShardsCopy copy the .ecx and some ec data slices
|
||||
func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
|
||||
|
||||
@@ -249,6 +257,9 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
||||
// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
|
||||
// the shard should not be mounted before calling this.
|
||||
func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
|
||||
|
||||
@@ -445,6 +456,9 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
|
||||
|
||||
@@ -475,6 +489,9 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
|
||||
|
||||
// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
|
||||
func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
|
||||
|
||||
|
||||
@@ -55,7 +55,12 @@ func (vs *VolumeServer) ReadNeedleMeta(ctx context.Context, req *volume_server_p
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_pb.WriteNeedleBlobRequest) (resp *volume_server_pb.WriteNeedleBlobResponse, err error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp = &volume_server_pb.WriteNeedleBlobResponse{}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
|
||||
|
||||
@@ -15,6 +15,10 @@ import (
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp = &volume_server_pb.FetchAndWriteNeedleResponse{}
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return fmt.Errorf("not found volume id %d", req.VolumeId)
|
||||
|
||||
@@ -12,6 +12,9 @@ import (
|
||||
|
||||
// VolumeTierMoveDatToRemote copy dat file to a remote tier
|
||||
func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// find existing volume
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
|
||||
@@ -34,6 +34,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
defer func(start time.Time) {
|
||||
stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds())
|
||||
@@ -76,6 +80,10 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
defer func(start time.Time) {
|
||||
stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds())
|
||||
@@ -98,9 +106,11 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
|
||||
|
||||
err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -171,3 +172,19 @@ func (vs *VolumeServer) Reload() {
|
||||
v := util.GetViper()
|
||||
vs.guard.UpdateWhiteList(append(vs.whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...))
|
||||
}
|
||||
|
||||
// Returns whether a volume server is in maintenance (i.e. read-only) mode.
|
||||
func (vs *VolumeServer) MaintenanceMode() bool {
|
||||
if vs.store == nil {
|
||||
return false
|
||||
}
|
||||
return vs.store.State.Pb.GetMaintenance()
|
||||
}
|
||||
|
||||
// Checks if a volume server is in maintenance mode, and returns an error explaining why.
|
||||
func (vs *VolumeServer) CheckMaintenanceMode() error {
|
||||
if !vs.MaintenanceMode() {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("volume server %s is in maintenance mode", vs.store.Id)
|
||||
}
|
||||
|
||||
72
weed/server/volume_server_test.go
Normal file
72
weed/server/volume_server_test.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
)
|
||||
|
||||
func TestMaintenanceMode(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
pb *volume_server_pb.VolumeServerState
|
||||
want bool
|
||||
wantCheckErr string
|
||||
}{
|
||||
{
|
||||
name: "non-initialized state",
|
||||
pb: nil,
|
||||
want: false,
|
||||
wantCheckErr: "",
|
||||
},
|
||||
{
|
||||
name: "maintenance mode disabled",
|
||||
pb: &volume_server_pb.VolumeServerState{
|
||||
Maintenance: false,
|
||||
},
|
||||
want: false,
|
||||
wantCheckErr: "",
|
||||
},
|
||||
{
|
||||
name: "maintenance mode enabled",
|
||||
pb: &volume_server_pb.VolumeServerState{
|
||||
Maintenance: true,
|
||||
},
|
||||
want: true,
|
||||
wantCheckErr: "volume server test_1234 is in maintenance mode",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
vs := VolumeServer{
|
||||
store: &storage.Store{
|
||||
Id: "test_1234",
|
||||
State: &storage.State{
|
||||
FilePath: "/some/path.pb",
|
||||
Pb: tc.pb,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if got, want := vs.MaintenanceMode(), tc.want; got != want {
|
||||
t.Errorf("MaintenanceMode() returned %v, want %v", got, want)
|
||||
}
|
||||
|
||||
err, wantErrStr := vs.CheckMaintenanceMode(), tc.wantCheckErr
|
||||
if err != nil {
|
||||
if wantErrStr == "" {
|
||||
t.Errorf("CheckMaintenanceMode() returned error %v, want nil", err)
|
||||
}
|
||||
if errStr := err.Error(); errStr != wantErrStr {
|
||||
t.Errorf("CheckMaintenanceMode() returned error %q, want %q", errStr, wantErrStr)
|
||||
}
|
||||
} else {
|
||||
if wantErrStr != "" {
|
||||
t.Errorf("CheckMaintenanceMode() returned no error, want %q", wantErrStr)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user