* fix(worker): pass compaction revision and file sizes in EC volume copy The worker EC task was sending CopyFile requests without the current compaction revision (defaulting to 0) and with StopOffset set to math.MaxInt64. After a vacuum compaction this caused the volume server to reject the copy or return stale data. Read the volume file status first and forward the compaction revision and actual file sizes so the copy is consistent with the compacted volume. * propagate erasure coding task context * fix(worker): validate volume file status and detect short copies Reject zero dat file size from ReadVolumeFileStatus — a zero-sized snapshot would produce 0-byte copies and broken EC shards. After streaming, verify totalBytes matches the expected stopOffset and return an error on short copies instead of logging success. * fix(worker): reject zero idx file size in volume status validation A non-empty dat with zero idx indicates an empty or corrupt volume. Without this guard, copyFileFromSource gets stopOffset=0, produces a 0-byte .idx, passes the short-copy check, and generateEcShardsLocally runs against a volume with no index. * fix fake plugin volume file status * fix plugin volume balance test fixtures
161 lines
5.4 KiB
Go
161 lines
5.4 KiB
Go
package volume_balance_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
const testVolumeDatSize = 1 * 1024 * 1024
|
|
|
|
func TestVolumeBalanceExecutionIntegration(t *testing.T) {
|
|
volumeID := uint32(303)
|
|
|
|
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
|
|
handler := pluginworker.NewVolumeBalanceHandler(dialOption)
|
|
harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{
|
|
WorkerOptions: pluginworker.WorkerOptions{
|
|
GrpcDialOption: dialOption,
|
|
},
|
|
Handlers: []pluginworker.JobHandler{handler},
|
|
})
|
|
harness.WaitForJobType("volume_balance")
|
|
|
|
source := pluginworkers.NewVolumeServer(t, "")
|
|
target := pluginworkers.NewVolumeServer(t, "")
|
|
pluginworkers.WriteTestVolumeFiles(t, source.BaseDir(), volumeID, testVolumeDatSize)
|
|
|
|
job := &plugin_pb.JobSpec{
|
|
JobId: fmt.Sprintf("balance-job-%d", volumeID),
|
|
JobType: "volume_balance",
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"volume_id": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeID)},
|
|
},
|
|
"collection": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "balance"},
|
|
},
|
|
"source_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: source.Address()},
|
|
},
|
|
"target_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: target.Address()},
|
|
},
|
|
},
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
result, err := harness.Plugin().ExecuteJob(ctx, job, nil, 1)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, result)
|
|
require.True(t, result.Success)
|
|
|
|
require.GreaterOrEqual(t, source.MarkReadonlyCount(), 1)
|
|
require.GreaterOrEqual(t, len(source.DeleteRequests()), 1)
|
|
|
|
copyCalls, _, tailCalls := target.BalanceStats()
|
|
require.GreaterOrEqual(t, copyCalls, 1)
|
|
require.GreaterOrEqual(t, tailCalls, 1)
|
|
}
|
|
|
|
func TestVolumeBalanceBatchExecutionIntegration(t *testing.T) {
|
|
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
|
|
handler := pluginworker.NewVolumeBalanceHandler(dialOption)
|
|
harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{
|
|
WorkerOptions: pluginworker.WorkerOptions{
|
|
GrpcDialOption: dialOption,
|
|
},
|
|
Handlers: []pluginworker.JobHandler{handler},
|
|
})
|
|
harness.WaitForJobType("volume_balance")
|
|
|
|
// Create one source and one target fake volume server.
|
|
source := pluginworkers.NewVolumeServer(t, "")
|
|
target := pluginworkers.NewVolumeServer(t, "")
|
|
|
|
// Build a batch job with 3 volume moves from source → target.
|
|
volumeIDs := []uint32{401, 402, 403}
|
|
for _, vid := range volumeIDs {
|
|
pluginworkers.WriteTestVolumeFiles(t, source.BaseDir(), vid, testVolumeDatSize)
|
|
}
|
|
moves := make([]*worker_pb.BalanceMoveSpec, len(volumeIDs))
|
|
for i, vid := range volumeIDs {
|
|
moves[i] = &worker_pb.BalanceMoveSpec{
|
|
VolumeId: vid,
|
|
SourceNode: source.Address(),
|
|
TargetNode: target.Address(),
|
|
Collection: "batch-test",
|
|
}
|
|
}
|
|
|
|
params := &worker_pb.TaskParams{
|
|
TaskId: "batch-balance-test",
|
|
TaskParams: &worker_pb.TaskParams_BalanceParams{
|
|
BalanceParams: &worker_pb.BalanceTaskParams{
|
|
MaxConcurrentMoves: 2,
|
|
Moves: moves,
|
|
},
|
|
},
|
|
}
|
|
paramBytes, err := proto.Marshal(params)
|
|
require.NoError(t, err)
|
|
|
|
job := &plugin_pb.JobSpec{
|
|
JobId: "batch-balance-test",
|
|
JobType: "volume_balance",
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"task_params_pb": {
|
|
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramBytes},
|
|
},
|
|
},
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
result, err := harness.Plugin().ExecuteJob(ctx, job, nil, 1)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, result)
|
|
require.True(t, result.Success, "batch balance job should succeed; result: %+v", result)
|
|
|
|
// Each of the 3 moves should have marked the source readonly and deleted.
|
|
require.Equal(t, len(volumeIDs), source.MarkReadonlyCount(),
|
|
"each move should mark source volume readonly")
|
|
require.Equal(t, len(volumeIDs), len(source.DeleteRequests()),
|
|
"each move should delete the source volume")
|
|
|
|
// Verify delete requests reference the expected volume IDs.
|
|
deletedVols := make(map[uint32]bool)
|
|
for _, req := range source.DeleteRequests() {
|
|
deletedVols[req.VolumeId] = true
|
|
}
|
|
for _, vid := range volumeIDs {
|
|
require.True(t, deletedVols[vid], "volume %d should have been deleted from source", vid)
|
|
}
|
|
|
|
// Each move reads source status once before copy and once inside the
|
|
// target's fake VolumeCopy implementation, then reads target status once
|
|
// before deleting the source.
|
|
require.Equal(t, len(volumeIDs)*2, source.ReadFileStatusCount(),
|
|
"each move should read source volume status before copy and during target copy")
|
|
require.Equal(t, len(volumeIDs), target.ReadFileStatusCount(),
|
|
"each move should read target volume status before delete")
|
|
|
|
// Target should have received copy and tail calls for all 3 volumes.
|
|
copyCalls, _, tailCalls := target.BalanceStats()
|
|
require.Equal(t, len(volumeIDs), copyCalls, "target should receive one copy per volume")
|
|
require.Equal(t, len(volumeIDs), tailCalls, "target should receive one tail per volume")
|
|
}
|