Files
seaweedFS/test/plugin_workers/fake_volume_server.go
Chris Lu d074830016 fix(worker): pass compaction revision and file sizes in EC volume copy (#8835)
* fix(worker): pass compaction revision and file sizes in EC volume copy

The worker EC task was sending CopyFile requests without the current
compaction revision (defaulting to 0) and with StopOffset set to
math.MaxInt64.  After a vacuum compaction this caused the volume server
to reject the copy or return stale data.

Read the volume file status first and forward the compaction revision
and actual file sizes so the copy is consistent with the compacted
volume.

* propagate erasure coding task context

* fix(worker): validate volume file status and detect short copies

Reject zero dat file size from ReadVolumeFileStatus — a zero-sized
snapshot would produce 0-byte copies and broken EC shards.

After streaming, verify totalBytes matches the expected stopOffset
and return an error on short copies instead of logging success.

* fix(worker): reject zero idx file size in volume status validation

A non-empty dat with zero idx indicates an empty or corrupt volume.
Without this guard, copyFileFromSource gets stopOffset=0, produces a
0-byte .idx, passes the short-copy check, and generateEcShardsLocally
runs against a volume with no index.

* fix fake plugin volume file status

* fix plugin volume balance test fixtures
2026-03-29 18:47:15 -07:00

458 lines
12 KiB
Go

package pluginworkers
import (
"context"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// VolumeServer provides a minimal volume server for erasure coding tests.
type VolumeServer struct {
volume_server_pb.UnimplementedVolumeServerServer
t *testing.T
server *grpc.Server
listener net.Listener
address string
baseDir string
mu sync.Mutex
receivedFiles map[string]uint64
mountRequests []*volume_server_pb.VolumeEcShardsMountRequest
deleteRequests []*volume_server_pb.VolumeDeleteRequest
markReadonlyCalls int
markWritableCalls int
readFileStatusCalls int
vacuumGarbageRatio float64
vacuumCheckCalls int
vacuumCompactCalls int
vacuumCommitCalls int
vacuumCleanupCalls int
volumeCopyCalls int
volumeMountCalls int
tailReceiverCalls int
}
// NewVolumeServer starts a test volume server using the provided base directory.
func NewVolumeServer(t *testing.T, baseDir string) *VolumeServer {
t.Helper()
if baseDir == "" {
baseDir = t.TempDir()
}
if err := os.MkdirAll(baseDir, 0755); err != nil {
t.Fatalf("create volume base dir: %v", err)
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen volume server: %v", err)
}
grpcPort := listener.Addr().(*net.TCPAddr).Port
server := pb.NewGrpcServer()
vs := &VolumeServer{
t: t,
server: server,
listener: listener,
address: fmt.Sprintf("127.0.0.1:0.%d", grpcPort),
baseDir: baseDir,
receivedFiles: make(map[string]uint64),
}
volume_server_pb.RegisterVolumeServerServer(server, vs)
go func() {
_ = server.Serve(listener)
}()
t.Cleanup(func() {
vs.Shutdown()
})
return vs
}
// Address returns the gRPC address of the volume server.
func (v *VolumeServer) Address() string {
return v.address
}
// BaseDir returns the base directory used by the server.
func (v *VolumeServer) BaseDir() string {
return v.baseDir
}
// ReceivedFiles returns a snapshot of received files and byte counts.
func (v *VolumeServer) ReceivedFiles() map[string]uint64 {
v.mu.Lock()
defer v.mu.Unlock()
out := make(map[string]uint64, len(v.receivedFiles))
for key, value := range v.receivedFiles {
out[key] = value
}
return out
}
// SetVacuumGarbageRatio sets the garbage ratio returned by VacuumVolumeCheck.
func (v *VolumeServer) SetVacuumGarbageRatio(ratio float64) {
v.mu.Lock()
defer v.mu.Unlock()
v.vacuumGarbageRatio = ratio
}
// VacuumStats returns the vacuum RPC call counts.
func (v *VolumeServer) VacuumStats() (check, compact, commit, cleanup int) {
v.mu.Lock()
defer v.mu.Unlock()
return v.vacuumCheckCalls, v.vacuumCompactCalls, v.vacuumCommitCalls, v.vacuumCleanupCalls
}
// BalanceStats returns the balance RPC call counts.
func (v *VolumeServer) BalanceStats() (copyCalls, mountCalls, tailCalls int) {
v.mu.Lock()
defer v.mu.Unlock()
return v.volumeCopyCalls, v.volumeMountCalls, v.tailReceiverCalls
}
// MountRequests returns recorded mount requests.
func (v *VolumeServer) MountRequests() []*volume_server_pb.VolumeEcShardsMountRequest {
v.mu.Lock()
defer v.mu.Unlock()
out := make([]*volume_server_pb.VolumeEcShardsMountRequest, len(v.mountRequests))
copy(out, v.mountRequests)
return out
}
// DeleteRequests returns recorded delete requests.
func (v *VolumeServer) DeleteRequests() []*volume_server_pb.VolumeDeleteRequest {
v.mu.Lock()
defer v.mu.Unlock()
out := make([]*volume_server_pb.VolumeDeleteRequest, len(v.deleteRequests))
copy(out, v.deleteRequests)
return out
}
// MarkReadonlyCount returns the number of readonly calls.
func (v *VolumeServer) MarkReadonlyCount() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.markReadonlyCalls
}
// MarkWritableCount returns the number of writable calls.
func (v *VolumeServer) MarkWritableCount() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.markWritableCalls
}
// ReadFileStatusCount returns the number of ReadVolumeFileStatus calls.
func (v *VolumeServer) ReadFileStatusCount() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.readFileStatusCalls
}
// Shutdown stops the volume server.
func (v *VolumeServer) Shutdown() {
if v.server != nil {
v.server.GracefulStop()
}
if v.listener != nil {
_ = v.listener.Close()
}
}
func (v *VolumeServer) filePath(volumeID uint32, ext string) string {
return filepath.Join(v.baseDir, fmt.Sprintf("%d%s", volumeID, ext))
}
func (v *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
if req == nil {
return fmt.Errorf("copy file request is nil")
}
path := v.filePath(req.VolumeId, req.Ext)
file, err := os.Open(path)
if err != nil {
if req.IgnoreSourceFileNotFound {
return nil
}
return err
}
defer file.Close()
buf := make([]byte, 64*1024)
remaining := int64(req.GetStopOffset())
for {
if remaining == 0 {
break
}
readBuf := buf
if remaining > 0 && remaining < int64(len(buf)) {
readBuf = buf[:remaining]
}
n, readErr := file.Read(readBuf)
if n > 0 {
if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: readBuf[:n]}); err != nil {
return err
}
if remaining > 0 {
remaining -= int64(n)
}
}
if readErr == io.EOF {
break
}
if readErr != nil {
return readErr
}
}
return nil
}
func (v *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_ReceiveFileServer) error {
var (
info *volume_server_pb.ReceiveFileInfo
file *os.File
bytesWritten uint64
filePath string
)
defer func() {
if file != nil {
_ = file.Close()
}
}()
for {
req, err := stream.Recv()
if err == io.EOF {
if info == nil {
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{Error: "missing file info"})
}
v.mu.Lock()
v.receivedFiles[filePath] = bytesWritten
v.mu.Unlock()
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{BytesWritten: bytesWritten})
}
if err != nil {
return err
}
if reqInfo := req.GetInfo(); reqInfo != nil {
info = reqInfo
filePath = v.filePath(info.VolumeId, info.Ext)
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
return err
}
file, err = os.Create(filePath)
if err != nil {
return err
}
continue
}
chunk := req.GetFileContent()
if len(chunk) == 0 {
continue
}
if file == nil {
return fmt.Errorf("file info not received")
}
n, writeErr := file.Write(chunk)
if writeErr != nil {
return writeErr
}
bytesWritten += uint64(n)
}
}
func (v *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
v.mu.Lock()
v.mountRequests = append(v.mountRequests, req)
v.mu.Unlock()
return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
}
func (v *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
v.mu.Lock()
v.deleteRequests = append(v.deleteRequests, req)
v.mu.Unlock()
if req != nil {
_ = os.Remove(v.filePath(req.VolumeId, ".dat"))
_ = os.Remove(v.filePath(req.VolumeId, ".idx"))
}
return &volume_server_pb.VolumeDeleteResponse{}, nil
}
func (v *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
v.mu.Lock()
v.markReadonlyCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeMarkReadonlyResponse{}, nil
}
func (v *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
v.mu.Lock()
v.markWritableCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeMarkWritableResponse{}, nil
}
func (v *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
v.mu.Lock()
v.readFileStatusCalls++
v.mu.Unlock()
datInfo, err := os.Stat(v.filePath(req.VolumeId, ".dat"))
if err != nil {
return nil, err
}
idxInfo, err := os.Stat(v.filePath(req.VolumeId, ".idx"))
if err != nil {
return nil, err
}
return &volume_server_pb.ReadVolumeFileStatusResponse{
VolumeId: req.VolumeId,
DatFileSize: uint64(datInfo.Size()),
IdxFileSize: uint64(idxInfo.Size()),
FileCount: 1,
}, nil
}
func (v *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
v.mu.Lock()
v.vacuumCheckCalls++
ratio := v.vacuumGarbageRatio
v.mu.Unlock()
return &volume_server_pb.VacuumVolumeCheckResponse{GarbageRatio: ratio}, nil
}
func (v *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
v.mu.Lock()
v.vacuumCompactCalls++
v.mu.Unlock()
return stream.Send(&volume_server_pb.VacuumVolumeCompactResponse{ProcessedBytes: 1024})
}
func (v *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
v.mu.Lock()
v.vacuumCommitCalls++
v.mu.Unlock()
return &volume_server_pb.VacuumVolumeCommitResponse{}, nil
}
func (v *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) {
v.mu.Lock()
v.vacuumCleanupCalls++
v.mu.Unlock()
return &volume_server_pb.VacuumVolumeCleanupResponse{}, nil
}
func (v *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
v.mu.Lock()
v.volumeCopyCalls++
v.mu.Unlock()
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
var statusResp *volume_server_pb.ReadVolumeFileStatusResponse
if err := operation.WithVolumeServerClient(false, pb.ServerAddress(req.SourceDataNode), dialOption,
func(client volume_server_pb.VolumeServerClient) error {
var readErr error
statusResp, readErr = client.ReadVolumeFileStatus(stream.Context(), &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
return readErr
}); err != nil {
return err
}
if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".dat", statusResp.DatFileSize, dialOption); err != nil {
return err
}
if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".idx", statusResp.IdxFileSize, dialOption); err != nil {
return err
}
if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: int64(statusResp.DatFileSize + statusResp.IdxFileSize)}); err != nil {
return err
}
return stream.Send(&volume_server_pb.VolumeCopyResponse{LastAppendAtNs: uint64(time.Now().UnixNano())})
}
func (v *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
v.mu.Lock()
v.volumeMountCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeMountResponse{}, nil
}
func (v *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
v.mu.Lock()
v.tailReceiverCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeTailReceiverResponse{}, nil
}
func (v *VolumeServer) copyRemoteFile(ctx context.Context, sourceDataNode string, volumeID uint32, ext string, fileSize uint64, dialOption grpc.DialOption) error {
path := v.filePath(volumeID, ext)
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return err
}
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
return operation.WithVolumeServerClient(true, pb.ServerAddress(sourceDataNode), dialOption,
func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
VolumeId: volumeID,
Ext: ext,
StopOffset: fileSize,
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr == io.EOF {
return nil
}
if recvErr != nil {
return recvErr
}
if len(resp.FileContent) == 0 {
continue
}
if _, err := file.Write(resp.FileContent); err != nil {
return err
}
}
})
}