* test: restore maintenance mode coverage in TestVolumeMarkReadonlyWritableErrorPaths PR #8360 removed the maintenance mode assertions because the refactored check ordering (volume lookup before maintenance check) caused the original test to hit "not found" instead of "maintenance mode" — the test used a non-existent volume ID. Restore coverage by allocating a real volume, then verifying: - existing volume in maintenance mode returns "maintenance mode" - non-existent volume in maintenance mode still returns "not found" (validating the new check ordering) * test: add coverage for ScrubVolume MarkBrokenVolumesReadonly flag PR #8360 added the mark_broken_volumes_readonly field to ScrubVolumeRequest but no tests exercised the new logic paths. Add three integration tests: - HealthyVolume: flag is a no-op when scrub finds no broken volumes - CorruptVolume: corrupted .idx triggers broken detection; without the flag the volume stays writable, with the flag it becomes read-only - MaintenanceMode: makeVolumeReadonly fails under maintenance and ScrubVolume propagates the error via errors.Join * refactor: extract CorruptIndexFile and EnableMaintenanceMode test helpers Move duplicated idx corruption and maintenance mode setup into framework.CorruptIndexFile() and framework.EnableMaintenanceMode() helpers. Use defer for file close in the corruption helper.
534 lines
18 KiB
Go
534 lines
18 KiB
Go
package volume_server_grpc_test
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
)
|
|
|
|
func TestScrubVolumeIndexAndUnsupportedMode(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(61)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
indexResp, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeID},
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubVolume index mode failed: %v", err)
|
|
}
|
|
if indexResp.GetTotalVolumes() != 1 {
|
|
t.Fatalf("ScrubVolume expected total_volumes=1, got %d", indexResp.GetTotalVolumes())
|
|
}
|
|
|
|
_, err = grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeID},
|
|
Mode: volume_server_pb.VolumeScrubMode(99),
|
|
})
|
|
if err == nil {
|
|
t.Fatalf("ScrubVolume should fail for unsupported mode")
|
|
}
|
|
if !strings.Contains(err.Error(), "unsupported volume scrub mode") {
|
|
t.Fatalf("ScrubVolume unsupported mode error mismatch: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestScrubEcVolumeMissingVolume(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := grpcClient.ScrubEcVolume(ctx, &volume_server_pb.ScrubEcVolumeRequest{
|
|
VolumeIds: []uint32{98765},
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
})
|
|
if err == nil {
|
|
t.Fatalf("ScrubEcVolume should fail for missing EC volume")
|
|
}
|
|
if !strings.Contains(err.Error(), "EC volume id") {
|
|
t.Fatalf("ScrubEcVolume missing-volume error mismatch: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestScrubEcVolumeAutoSelectNoEcVolumes(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := grpcClient.ScrubEcVolume(ctx, &volume_server_pb.ScrubEcVolumeRequest{
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubEcVolume auto-select failed: %v", err)
|
|
}
|
|
if resp.GetTotalVolumes() != 0 {
|
|
t.Fatalf("ScrubEcVolume auto-select expected total_volumes=0 without EC data, got %d", resp.GetTotalVolumes())
|
|
}
|
|
if len(resp.GetBrokenVolumeIds()) != 0 {
|
|
t.Fatalf("ScrubEcVolume auto-select expected no broken volumes, got %v", resp.GetBrokenVolumeIds())
|
|
}
|
|
}
|
|
|
|
func TestQueryInvalidAndMissingFileIDPaths(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
invalidStream, err := grpcClient.Query(ctx, &volume_server_pb.QueryRequest{
|
|
FromFileIds: []string{"bad-fid"},
|
|
Selections: []string{"name"},
|
|
Filter: &volume_server_pb.QueryRequest_Filter{},
|
|
InputSerialization: &volume_server_pb.QueryRequest_InputSerialization{
|
|
JsonInput: &volume_server_pb.QueryRequest_InputSerialization_JSONInput{},
|
|
},
|
|
})
|
|
if err == nil {
|
|
_, err = invalidStream.Recv()
|
|
}
|
|
if err == nil {
|
|
t.Fatalf("Query should fail for invalid file id")
|
|
}
|
|
|
|
missingFid := framework.NewFileID(98766, 1, 1)
|
|
missingStream, err := grpcClient.Query(ctx, &volume_server_pb.QueryRequest{
|
|
FromFileIds: []string{missingFid},
|
|
Selections: []string{"name"},
|
|
Filter: &volume_server_pb.QueryRequest_Filter{},
|
|
InputSerialization: &volume_server_pb.QueryRequest_InputSerialization{
|
|
JsonInput: &volume_server_pb.QueryRequest_InputSerialization_JSONInput{},
|
|
},
|
|
})
|
|
if err == nil {
|
|
_, err = missingStream.Recv()
|
|
}
|
|
if err == nil {
|
|
t.Fatalf("Query should fail for missing file id volume")
|
|
}
|
|
}
|
|
|
|
func TestScrubVolumeAutoSelectAndAllModes(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeIDA = uint32(62)
|
|
const volumeIDB = uint32(63)
|
|
framework.AllocateVolume(t, grpcClient, volumeIDA, "")
|
|
framework.AllocateVolume(t, grpcClient, volumeIDB, "")
|
|
|
|
// upload some data so index files are not zero-sized
|
|
httpClient := framework.NewHTTPClient()
|
|
framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeIDA, 1, 1), []byte("test data A"))
|
|
framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeIDB, 2, 2), []byte("test data B"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
autoResp, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubVolume auto-select failed: %v", err)
|
|
}
|
|
if autoResp.GetTotalVolumes() < 2 {
|
|
t.Fatalf("ScrubVolume auto-select expected at least 2 volumes, got %d", autoResp.GetTotalVolumes())
|
|
}
|
|
|
|
localResp, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeIDA},
|
|
Mode: volume_server_pb.VolumeScrubMode_LOCAL,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubVolume local mode failed: %v", err)
|
|
}
|
|
if localResp.GetTotalVolumes() != 1 {
|
|
t.Fatalf("ScrubVolume local mode expected total_volumes=1, got %d", localResp.GetTotalVolumes())
|
|
}
|
|
if len(localResp.GetBrokenVolumeIds()) != 0 {
|
|
t.Fatalf("ScrubVolume local mode expected no broken volumes, got %v: %v", localResp.GetBrokenVolumeIds(), localResp.GetDetails())
|
|
}
|
|
|
|
fullResp, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeIDA},
|
|
Mode: volume_server_pb.VolumeScrubMode_FULL,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubVolume full mode failed: %v", err)
|
|
}
|
|
if fullResp.GetTotalVolumes() != 1 {
|
|
t.Fatalf("ScrubVolume full mode expected total_volumes=1, got %d", fullResp.GetTotalVolumes())
|
|
}
|
|
if len(fullResp.GetBrokenVolumeIds()) != 0 {
|
|
t.Fatalf("ScrubVolume full mode expected no broken volumes, got %v: %v", fullResp.GetBrokenVolumeIds(), fullResp.GetDetails())
|
|
}
|
|
}
|
|
|
|
func TestQueryJsonSuccessAndCsvNoOutput(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(64)
|
|
const needleID = uint64(777001)
|
|
const cookie = uint32(0xAABBCCDD)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
jsonLines := []byte("{\"score\":3}\n{\"score\":12}\n{\"score\":18}\n")
|
|
httpClient := framework.NewHTTPClient()
|
|
fid := framework.NewFileID(volumeID, needleID, cookie)
|
|
uploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, jsonLines)
|
|
_ = framework.ReadAllAndClose(t, uploadResp)
|
|
if uploadResp.StatusCode != 201 {
|
|
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
queryStream, err := grpcClient.Query(ctx, &volume_server_pb.QueryRequest{
|
|
FromFileIds: []string{fid},
|
|
Selections: []string{"score"},
|
|
Filter: &volume_server_pb.QueryRequest_Filter{
|
|
Field: "score",
|
|
Operand: ">",
|
|
Value: "10",
|
|
},
|
|
InputSerialization: &volume_server_pb.QueryRequest_InputSerialization{
|
|
JsonInput: &volume_server_pb.QueryRequest_InputSerialization_JSONInput{Type: "LINES"},
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Query json start failed: %v", err)
|
|
}
|
|
|
|
firstStripe, err := queryStream.Recv()
|
|
if err != nil {
|
|
t.Fatalf("Query json recv failed: %v", err)
|
|
}
|
|
records := string(firstStripe.GetRecords())
|
|
if !strings.Contains(records, "score:12") || !strings.Contains(records, "score:18") {
|
|
t.Fatalf("Query json records missing expected filtered scores: %q", records)
|
|
}
|
|
if strings.Contains(records, "score:3") {
|
|
t.Fatalf("Query json records should not include filtered-out score: %q", records)
|
|
}
|
|
_, err = queryStream.Recv()
|
|
if err != io.EOF {
|
|
t.Fatalf("Query json expected EOF after first stripe, got: %v", err)
|
|
}
|
|
|
|
csvStream, err := grpcClient.Query(ctx, &volume_server_pb.QueryRequest{
|
|
FromFileIds: []string{fid},
|
|
Selections: []string{"score"},
|
|
Filter: &volume_server_pb.QueryRequest_Filter{},
|
|
InputSerialization: &volume_server_pb.QueryRequest_InputSerialization{
|
|
CsvInput: &volume_server_pb.QueryRequest_InputSerialization_CSVInput{},
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Query csv start failed: %v", err)
|
|
}
|
|
_, err = csvStream.Recv()
|
|
if err != io.EOF {
|
|
t.Fatalf("Query csv expected EOF with no rows, got: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestQueryJsonNoMatchReturnsEmptyStripe(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(65)
|
|
const needleID = uint64(777002)
|
|
const cookie = uint32(0xABABCDCD)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
jsonLines := []byte("{\"score\":1}\n{\"score\":2}\n")
|
|
httpClient := framework.NewHTTPClient()
|
|
fid := framework.NewFileID(volumeID, needleID, cookie)
|
|
uploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, jsonLines)
|
|
_ = framework.ReadAllAndClose(t, uploadResp)
|
|
if uploadResp.StatusCode != 201 {
|
|
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
queryStream, err := grpcClient.Query(ctx, &volume_server_pb.QueryRequest{
|
|
FromFileIds: []string{fid},
|
|
Selections: []string{"score"},
|
|
Filter: &volume_server_pb.QueryRequest_Filter{
|
|
Field: "score",
|
|
Operand: ">",
|
|
Value: "100",
|
|
},
|
|
InputSerialization: &volume_server_pb.QueryRequest_InputSerialization{
|
|
JsonInput: &volume_server_pb.QueryRequest_InputSerialization_JSONInput{Type: "LINES"},
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Query json no-match start failed: %v", err)
|
|
}
|
|
|
|
firstStripe, err := queryStream.Recv()
|
|
if err != nil {
|
|
t.Fatalf("Query json no-match recv failed: %v", err)
|
|
}
|
|
if len(firstStripe.GetRecords()) != 0 {
|
|
t.Fatalf("Query json no-match expected empty records stripe, got: %q", string(firstStripe.GetRecords()))
|
|
}
|
|
|
|
_, err = queryStream.Recv()
|
|
if err != io.EOF {
|
|
t.Fatalf("Query json no-match expected EOF after first empty stripe, got: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestQueryCookieMismatchReturnsEOFNoResults(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(66)
|
|
const needleID = uint64(777003)
|
|
const cookie = uint32(0xCDCDABAB)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
jsonLines := []byte("{\"score\":7}\n{\"score\":8}\n")
|
|
httpClient := framework.NewHTTPClient()
|
|
fid := framework.NewFileID(volumeID, needleID, cookie)
|
|
uploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, jsonLines)
|
|
_ = framework.ReadAllAndClose(t, uploadResp)
|
|
if uploadResp.StatusCode != 201 {
|
|
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
|
|
}
|
|
|
|
wrongCookieFid := framework.NewFileID(volumeID, needleID, cookie+1)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := grpcClient.Query(ctx, &volume_server_pb.QueryRequest{
|
|
FromFileIds: []string{wrongCookieFid},
|
|
Selections: []string{"score"},
|
|
Filter: &volume_server_pb.QueryRequest_Filter{
|
|
Field: "score",
|
|
Operand: ">",
|
|
Value: "0",
|
|
},
|
|
InputSerialization: &volume_server_pb.QueryRequest_InputSerialization{
|
|
JsonInput: &volume_server_pb.QueryRequest_InputSerialization_JSONInput{Type: "LINES"},
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Query start for cookie mismatch should not fail immediately, got: %v", err)
|
|
}
|
|
|
|
_, err = stream.Recv()
|
|
if err != io.EOF {
|
|
t.Fatalf("Query cookie mismatch expected EOF with no streamed records, got: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestScrubVolumeMarkBrokenReadonlyHealthyVolume(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(76)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
httpClient := framework.NewHTTPClient()
|
|
framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeID, 1, 1), []byte("healthy data"))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeID},
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
MarkBrokenVolumesReadonly: true,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubVolume with MarkBrokenVolumesReadonly on healthy volume failed: %v", err)
|
|
}
|
|
if len(resp.GetBrokenVolumeIds()) != 0 {
|
|
t.Fatalf("expected no broken volumes, got %v", resp.GetBrokenVolumeIds())
|
|
}
|
|
|
|
statusResp, err := grpcClient.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{VolumeId: volumeID})
|
|
if err != nil {
|
|
t.Fatalf("VolumeStatus failed: %v", err)
|
|
}
|
|
if statusResp.GetIsReadOnly() {
|
|
t.Fatalf("healthy volume should not be read-only after scrub with MarkBrokenVolumesReadonly")
|
|
}
|
|
}
|
|
|
|
func TestScrubVolumeMarkBrokenReadonlyCorruptVolume(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(77)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
httpClient := framework.NewHTTPClient()
|
|
framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeID, 1, 1), []byte("test data"))
|
|
|
|
framework.CorruptIndexFile(t, clusterHarness.BaseDir(), volumeID)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
// scrub without the flag: broken volume reported but not marked read-only
|
|
resp, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeID},
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubVolume without flag failed: %v", err)
|
|
}
|
|
if len(resp.GetBrokenVolumeIds()) == 0 {
|
|
t.Fatalf("expected broken volume after corruption")
|
|
}
|
|
|
|
statusResp, err := grpcClient.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{VolumeId: volumeID})
|
|
if err != nil {
|
|
t.Fatalf("VolumeStatus after scrub without flag failed: %v", err)
|
|
}
|
|
if statusResp.GetIsReadOnly() {
|
|
t.Fatalf("volume should not be read-only when MarkBrokenVolumesReadonly is false")
|
|
}
|
|
|
|
// scrub with the flag: broken volume should now be marked read-only
|
|
resp, err = grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeID},
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
MarkBrokenVolumesReadonly: true,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ScrubVolume with MarkBrokenVolumesReadonly failed: %v", err)
|
|
}
|
|
if len(resp.GetBrokenVolumeIds()) == 0 {
|
|
t.Fatalf("expected broken volume after corruption with flag")
|
|
}
|
|
|
|
found := false
|
|
for _, d := range resp.GetDetails() {
|
|
if strings.Contains(d, "is now read-only") {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("expected 'is now read-only' in details, got: %v", resp.GetDetails())
|
|
}
|
|
|
|
statusResp, err = grpcClient.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{VolumeId: volumeID})
|
|
if err != nil {
|
|
t.Fatalf("VolumeStatus after MarkBrokenVolumesReadonly scrub failed: %v", err)
|
|
}
|
|
if !statusResp.GetIsReadOnly() {
|
|
t.Fatalf("broken volume should be read-only after MarkBrokenVolumesReadonly scrub")
|
|
}
|
|
}
|
|
|
|
func TestScrubVolumeMarkBrokenReadonlyInMaintenanceMode(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(78)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
httpClient := framework.NewHTTPClient()
|
|
framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeID, 1, 1), []byte("test data"))
|
|
|
|
framework.CorruptIndexFile(t, clusterHarness.BaseDir(), volumeID)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
framework.EnableMaintenanceMode(t, ctx, grpcClient)
|
|
|
|
// scrub with the flag in maintenance mode: makeVolumeReadonly should fail
|
|
// and ScrubVolume should propagate the error
|
|
_, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
|
|
VolumeIds: []uint32{volumeID},
|
|
Mode: volume_server_pb.VolumeScrubMode_INDEX,
|
|
MarkBrokenVolumesReadonly: true,
|
|
})
|
|
if err == nil || !strings.Contains(err.Error(), "maintenance mode") {
|
|
t.Fatalf("ScrubVolume with MarkBrokenVolumesReadonly in maintenance mode error mismatch: %v", err)
|
|
}
|
|
}
|