Give the ScrubVolume() RPC an option to flag found broken volumes as read-only. (#8360)
* Give the `ScrubVolume()` RPC an option to flag found broken volumes as read-only. Also exposes this option in the shell `volume.scrub` command. * Remove redundant test in `TestVolumeMarkReadonlyWritableErrorPaths`. 417051bb slightly rearranges the logic for `VolumeMarkReadonly()` and `VolumeMarkWritable()`, so calling them for invalid volume IDs will actually yield that error, instead of checking maintnenance mode first.
This commit is contained in:
@@ -123,16 +123,6 @@ func TestVolumeMarkReadonlyWritableErrorPaths(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("SetState maintenance=true failed: %v", err)
|
||||
}
|
||||
|
||||
_, err = grpcClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{VolumeId: 1, Persist: true})
|
||||
if err == nil || !strings.Contains(err.Error(), "maintenance mode") {
|
||||
t.Fatalf("VolumeMarkReadonly maintenance error mismatch: %v", err)
|
||||
}
|
||||
|
||||
_, err = grpcClient.VolumeMarkWritable(ctx, &volume_server_pb.VolumeMarkWritableRequest{VolumeId: 1})
|
||||
if err == nil || !strings.Contains(err.Error(), "maintenance mode") {
|
||||
t.Fatalf("VolumeMarkWritable maintenance error mismatch: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteCollectionRemovesVolumeAndIsIdempotent(t *testing.T) {
|
||||
|
||||
@@ -656,6 +656,7 @@ message ScrubVolumeRequest {
|
||||
VolumeScrubMode mode = 1;
|
||||
// optional list of volume IDs to scrub. if empty, all volumes for the server are scrubbed.
|
||||
repeated uint32 volume_ids = 2;
|
||||
bool mark_broken_volumes_readonly = 3;
|
||||
}
|
||||
message ScrubVolumeResponse {
|
||||
uint64 total_volumes = 1;
|
||||
|
||||
@@ -7,12 +7,13 @@
|
||||
package volume_server_pb
|
||||
|
||||
import (
|
||||
remote_pb "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
unsafe "unsafe"
|
||||
|
||||
remote_pb "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -5579,6 +5580,7 @@ type ScrubVolumeRequest struct {
|
||||
Mode VolumeScrubMode `protobuf:"varint,1,opt,name=mode,proto3,enum=volume_server_pb.VolumeScrubMode" json:"mode,omitempty"`
|
||||
// optional list of volume IDs to scrub. if empty, all volumes for the server are scrubbed.
|
||||
VolumeIds []uint32 `protobuf:"varint,2,rep,packed,name=volume_ids,json=volumeIds,proto3" json:"volume_ids,omitempty"`
|
||||
MarkBrokenVolumesReadonly bool `protobuf:"varint,3,opt,name=mark_broken_volumes_readonly,json=markBrokenVolumesReadonly,proto3" json:"mark_broken_volumes_readonly,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -5627,6 +5629,13 @@ func (x *ScrubVolumeRequest) GetVolumeIds() []uint32 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *ScrubVolumeRequest) GetMarkBrokenVolumesReadonly() bool {
|
||||
if x != nil {
|
||||
return x.MarkBrokenVolumesReadonly
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type ScrubVolumeResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
TotalVolumes uint64 `protobuf:"varint,1,opt,name=total_volumes,json=totalVolumes,proto3" json:"total_volumes,omitempty"`
|
||||
@@ -7146,11 +7155,12 @@ const file_volume_server_proto_rawDesc = "" +
|
||||
"public_url\x18\x02 \x01(\tR\tpublicUrl\x12\x1b\n" +
|
||||
"\tgrpc_port\x18\x03 \x01(\x05R\bgrpcPort\"2\n" +
|
||||
"\x1bFetchAndWriteNeedleResponse\x12\x13\n" +
|
||||
"\x05e_tag\x18\x01 \x01(\tR\x04eTag\"j\n" +
|
||||
"\x05e_tag\x18\x01 \x01(\tR\x04eTag\"\xab\x01\n" +
|
||||
"\x12ScrubVolumeRequest\x125\n" +
|
||||
"\x04mode\x18\x01 \x01(\x0e2!.volume_server_pb.VolumeScrubModeR\x04mode\x12\x1d\n" +
|
||||
"\n" +
|
||||
"volume_ids\x18\x02 \x03(\rR\tvolumeIds\"\xa1\x01\n" +
|
||||
"volume_ids\x18\x02 \x03(\rR\tvolumeIds\x12?\n" +
|
||||
"\x1cmark_broken_volumes_readonly\x18\x03 \x01(\bR\x19markBrokenVolumesReadonly\"\xa1\x01\n" +
|
||||
"\x13ScrubVolumeResponse\x12#\n" +
|
||||
"\rtotal_volumes\x18\x01 \x01(\x04R\ftotalVolumes\x12\x1f\n" +
|
||||
"\vtotal_files\x18\x02 \x01(\x04R\n" +
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.5.1
|
||||
// - protoc v6.33.4
|
||||
// - protoc-gen-go-grpc v1.6.1
|
||||
// - protoc v3.21.12
|
||||
// source: volume_server.proto
|
||||
|
||||
package volume_server_pb
|
||||
@@ -781,148 +781,148 @@ type VolumeServerServer interface {
|
||||
type UnimplementedVolumeServerServer struct{}
|
||||
|
||||
func (UnimplementedVolumeServerServer) BatchDelete(context.Context, *BatchDeleteRequest) (*BatchDeleteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method BatchDelete not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method BatchDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCheck(context.Context, *VacuumVolumeCheckRequest) (*VacuumVolumeCheckResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCompact(*VacuumVolumeCompactRequest, grpc.ServerStreamingServer[VacuumVolumeCompactResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
|
||||
return status.Error(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCommit(context.Context, *VacuumVolumeCommitRequest) (*VacuumVolumeCommitResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCleanup(context.Context, *VacuumVolumeCleanupRequest) (*VacuumVolumeCleanupResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method DeleteCollection not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method DeleteCollection not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) AllocateVolume(context.Context, *AllocateVolumeRequest) (*AllocateVolumeResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method AllocateVolume not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method AllocateVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeSyncStatus(context.Context, *VolumeSyncStatusRequest) (*VolumeSyncStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeSyncStatus not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeSyncStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeIncrementalCopy(*VolumeIncrementalCopyRequest, grpc.ServerStreamingServer[VolumeIncrementalCopyResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
|
||||
return status.Error(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMount not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeMount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeUnmount not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeUnmount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeDelete(context.Context, *VolumeDeleteRequest) (*VolumeDeleteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeDelete not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkWritable not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeMarkWritable not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeConfigure(context.Context, *VolumeConfigureRequest) (*VolumeConfigureResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeConfigure not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeConfigure not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeStatus(context.Context, *VolumeStatusRequest) (*VolumeStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeStatus not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetState not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method GetState not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) SetState(context.Context, *SetStateRequest) (*SetStateResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SetState not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method SetState not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeCopy(*VolumeCopyRequest, grpc.ServerStreamingServer[VolumeCopyResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeCopy not implemented")
|
||||
return status.Error(codes.Unimplemented, "method VolumeCopy not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) CopyFile(*CopyFileRequest, grpc.ServerStreamingServer[CopyFileResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method CopyFile not implemented")
|
||||
return status.Error(codes.Unimplemented, "method CopyFile not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReceiveFile(grpc.ClientStreamingServer[ReceiveFileRequest, ReceiveFileResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method ReceiveFile not implemented")
|
||||
return status.Error(codes.Unimplemented, "method ReceiveFile not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadNeedleBlob(context.Context, *ReadNeedleBlobRequest) (*ReadNeedleBlobResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleBlob not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method ReadNeedleBlob not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadNeedleMeta(context.Context, *ReadNeedleMetaRequest) (*ReadNeedleMetaResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleMeta not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method ReadNeedleMeta not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) WriteNeedleBlob(context.Context, *WriteNeedleBlobRequest) (*WriteNeedleBlobResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method WriteNeedleBlob not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method WriteNeedleBlob not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadAllNeedles(*ReadAllNeedlesRequest, grpc.ServerStreamingServer[ReadAllNeedlesResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method ReadAllNeedles not implemented")
|
||||
return status.Error(codes.Unimplemented, "method ReadAllNeedles not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTailSender(*VolumeTailSenderRequest, grpc.ServerStreamingServer[VolumeTailSenderResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeTailSender not implemented")
|
||||
return status.Error(codes.Unimplemented, "method VolumeTailSender not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTailReceiver(context.Context, *VolumeTailReceiverRequest) (*VolumeTailReceiverResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeTailReceiver not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeTailReceiver not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsGenerate(context.Context, *VolumeEcShardsGenerateRequest) (*VolumeEcShardsGenerateResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsRebuild(context.Context, *VolumeEcShardsRebuildRequest) (*VolumeEcShardsRebuildResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsCopy(context.Context, *VolumeEcShardsCopyRequest) (*VolumeEcShardsCopyResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsDelete(context.Context, *VolumeEcShardsDeleteRequest) (*VolumeEcShardsDeleteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsMount(context.Context, *VolumeEcShardsMountRequest) (*VolumeEcShardsMountResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsUnmount(context.Context, *VolumeEcShardsUnmountRequest) (*VolumeEcShardsUnmountResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardRead(*VolumeEcShardReadRequest, grpc.ServerStreamingServer[VolumeEcShardReadResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeEcShardRead not implemented")
|
||||
return status.Error(codes.Unimplemented, "method VolumeEcShardRead not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcBlobDelete(context.Context, *VolumeEcBlobDeleteRequest) (*VolumeEcBlobDeleteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsToVolume(context.Context, *VolumeEcShardsToVolumeRequest) (*VolumeEcShardsToVolumeResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsInfo(context.Context, *VolumeEcShardsInfoRequest) (*VolumeEcShardsInfoResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTierMoveDatToRemote(*VolumeTierMoveDatToRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatToRemoteResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
|
||||
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTierMoveDatFromRemote(*VolumeTierMoveDatFromRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatFromRemoteResponse]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
|
||||
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeServerStatus(context.Context, *VolumeServerStatusRequest) (*VolumeServerStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerStatus not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeServerStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeServerLeave(context.Context, *VolumeServerLeaveRequest) (*VolumeServerLeaveResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerLeave not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeServerLeave not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) FetchAndWriteNeedle(context.Context, *FetchAndWriteNeedleRequest) (*FetchAndWriteNeedleResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ScrubVolume(context.Context, *ScrubVolumeRequest) (*ScrubVolumeResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ScrubVolume not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method ScrubVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ScrubEcVolume(context.Context, *ScrubEcVolumeRequest) (*ScrubEcVolumeResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ScrubEcVolume not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method ScrubEcVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) Query(*QueryRequest, grpc.ServerStreamingServer[QueriedStripe]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Query not implemented")
|
||||
return status.Error(codes.Unimplemented, "method Query not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeNeedleStatus(context.Context, *VolumeNeedleStatusRequest) (*VolumeNeedleStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
|
||||
return nil, status.Error(codes.Unimplemented, "method Ping not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) mustEmbedUnimplementedVolumeServerServer() {}
|
||||
func (UnimplementedVolumeServerServer) testEmbeddedByValue() {}
|
||||
@@ -935,7 +935,7 @@ type UnsafeVolumeServerServer interface {
|
||||
}
|
||||
|
||||
func RegisterVolumeServerServer(s grpc.ServiceRegistrar, srv VolumeServerServer) {
|
||||
// If the following call pancis, it indicates UnimplementedVolumeServerServer was
|
||||
// If the following call panics, it indicates UnimplementedVolumeServerServer was
|
||||
// embedded by pointer and is nil. This will cause panics if an
|
||||
// unimplemented method is ever invoked, so we test this at initialization
|
||||
// time to prevent it from happening at runtime later due to I/O.
|
||||
|
||||
@@ -162,44 +162,56 @@ func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_
|
||||
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
|
||||
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
|
||||
|
||||
func (vs *VolumeServer) makeVolumeReadonly(ctx context.Context, v *storage.Volume, persist bool) error {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
return err
|
||||
}
|
||||
|
||||
// step 1: stop master from redirecting traffic here
|
||||
if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
|
||||
return resp, err
|
||||
if err := vs.notifyMasterVolumeReadonly(ctx, v, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
|
||||
|
||||
// step 2: mark local volume as readonly
|
||||
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId), req.GetPersist())
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("volume mark readonly %v: %v", req, err)
|
||||
if err := vs.store.MarkVolumeReadonly(v.Id, persist); err != nil {
|
||||
glog.Errorf("mark volume %d readonly: %v", v.Id, err)
|
||||
return err
|
||||
} else {
|
||||
glog.V(2).Infof("volume mark readonly %v", req)
|
||||
glog.V(2).Infof("volume %d marked readonly", v.Id)
|
||||
}
|
||||
|
||||
// step 3: tell master from redirecting traffic here again, to prevent rare case 1.5
|
||||
if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
|
||||
return resp, err
|
||||
if err := vs.notifyMasterVolumeReadonly(ctx, v, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return resp, err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error {
|
||||
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
func (vs *VolumeServer) makeVolumeWritable(ctx context.Context, v *storage.Volume) error {
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := vs.store.MarkVolumeWritable(v.Id); err != nil {
|
||||
glog.Errorf("mark volume %d writable: %v", v.Id, err)
|
||||
return err
|
||||
} else {
|
||||
glog.V(2).Infof("volume %d marked writable", v.Id)
|
||||
}
|
||||
|
||||
// enable master to redirect traffic here
|
||||
if err := vs.notifyMasterVolumeReadonly(ctx, v, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) notifyMasterVolumeReadonly(ctx context.Context, v *storage.Volume, isReadOnly bool) error {
|
||||
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(ctx), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
_, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
|
||||
Ip: vs.store.Ip,
|
||||
Port: uint32(vs.store.Port),
|
||||
@@ -221,34 +233,36 @@ func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
||||
resp := &volume_server_pb.VolumeMarkWritableResponse{}
|
||||
|
||||
if err := vs.CheckMaintenanceMode(); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
|
||||
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
return resp, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
}
|
||||
|
||||
err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("volume mark writable %v: %v", req, err)
|
||||
} else {
|
||||
glog.V(2).Infof("volume mark writable %v", req)
|
||||
}
|
||||
|
||||
// enable master to redirect traffic here
|
||||
if err := vs.notifyMasterVolumeReadonly(v, false); err != nil {
|
||||
if err := vs.makeVolumeReadonly(ctx, v, req.GetPersist()); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
||||
resp := &volume_server_pb.VolumeMarkWritableResponse{}
|
||||
|
||||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
|
||||
if v == nil {
|
||||
return resp, fmt.Errorf("volume %d not found", req.VolumeId)
|
||||
}
|
||||
|
||||
if err := vs.makeVolumeWritable(ctx, v); err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
|
||||
|
||||
resp := &volume_server_pb.VolumeStatusResponse{}
|
||||
|
||||
@@ -2,9 +2,11 @@ package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
@@ -22,6 +24,7 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
|
||||
|
||||
var details []string
|
||||
var totalVolumes, totalFiles uint64
|
||||
var brokenVolumes []*storage.Volume
|
||||
var brokenVolumeIds []uint32
|
||||
for _, vid := range vids {
|
||||
v := vs.store.GetVolume(vid)
|
||||
@@ -46,13 +49,29 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
|
||||
totalVolumes += 1
|
||||
totalFiles += uint64(files)
|
||||
if len(serrs) != 0 {
|
||||
brokenVolumeIds = append(brokenVolumeIds, uint32(vid))
|
||||
brokenVolumes = append(brokenVolumes, v)
|
||||
brokenVolumeIds = append(brokenVolumeIds, uint32(v.Id))
|
||||
for _, err := range serrs {
|
||||
details = append(details, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errs := []error{}
|
||||
if req.GetMarkBrokenVolumesReadonly() {
|
||||
for _, v := range brokenVolumes {
|
||||
if err := vs.makeVolumeReadonly(ctx, v, true); err != nil {
|
||||
errs = append(errs, err)
|
||||
details = append(details, err.Error())
|
||||
} else {
|
||||
details = append(details, fmt.Sprintf("volume %d is now read-only", v.Id))
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, errors.Join(errs...)
|
||||
}
|
||||
|
||||
res := &volume_server_pb.ScrubVolumeResponse{
|
||||
TotalVolumes: totalVolumes,
|
||||
TotalFiles: totalFiles,
|
||||
@@ -102,7 +121,7 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
|
||||
totalVolumes += 1
|
||||
totalFiles += uint64(files)
|
||||
if len(serrs) != 0 || len(shardInfos) != 0 {
|
||||
brokenVolumeIds = append(brokenVolumeIds, uint32(vid))
|
||||
brokenVolumeIds = append(brokenVolumeIds, uint32(v.VolumeId))
|
||||
brokenShardInfos = append(brokenShardInfos, shardInfos...)
|
||||
for _, err := range serrs {
|
||||
details = append(details, err.Error())
|
||||
|
||||
@@ -51,6 +51,7 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io
|
||||
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
|
||||
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)")
|
||||
mode := volScrubCommand.String("mode", "full", "scrubbing mode (index/local/full)")
|
||||
markBrokenReadonly := volScrubCommand.Bool("markBrokenReadonly", false, "whether to flag volumes with scrub failures as read-only")
|
||||
maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
||||
|
||||
if err = volScrubCommand.Parse(args); err != nil {
|
||||
@@ -103,10 +104,10 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io
|
||||
fmt.Fprintf(writer, "using %s mode\n", c.mode.String())
|
||||
c.env = commandEnv
|
||||
|
||||
return c.scrubVolumes(writer, *maxParallelization)
|
||||
return c.scrubVolumes(writer, *maxParallelization, *markBrokenReadonly)
|
||||
}
|
||||
|
||||
func (c *commandVolumeScrub) scrubVolumes(writer io.Writer, maxParallelization int) error {
|
||||
func (c *commandVolumeScrub) scrubVolumes(writer io.Writer, maxParallelization int, markBrokenReadonly bool) error {
|
||||
var brokenVolumesStr []string
|
||||
var details []string
|
||||
var totalVolumes, brokenVolumes, totalFiles uint64
|
||||
@@ -125,6 +126,7 @@ func (c *commandVolumeScrub) scrubVolumes(writer io.Writer, maxParallelization i
|
||||
res, err := volumeServerClient.ScrubVolume(context.Background(), &volume_server_pb.ScrubVolumeRequest{
|
||||
Mode: c.mode,
|
||||
VolumeIds: c.volumeIDs,
|
||||
MarkBrokenVolumesReadonly: markBrokenReadonly,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user