test: fix EC integration test needle blob mismatch (#7972)
fix needle blob
This commit is contained in:
@@ -18,6 +18,8 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/shell"
|
"github.com/seaweedfs/seaweedfs/weed/shell"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@@ -2363,16 +2365,47 @@ func TestECEncodeReplicatedVolumeSync(t *testing.T) {
|
|||||||
// injectNeedleToOneReplica writes a needle directly to one replica to create divergence
|
// injectNeedleToOneReplica writes a needle directly to one replica to create divergence
|
||||||
func injectNeedleToOneReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location, needleId uint64, data []byte) error {
|
func injectNeedleToOneReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location, needleId uint64, data []byte) error {
|
||||||
return operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||||
|
blob, size := generateNeedleBlobV3(needleId, data)
|
||||||
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
|
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
|
||||||
VolumeId: uint32(vid),
|
VolumeId: uint32(vid),
|
||||||
NeedleId: needleId,
|
NeedleId: needleId,
|
||||||
Size: int32(len(data)),
|
Size: size,
|
||||||
NeedleBlob: data,
|
NeedleBlob: blob,
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func generateNeedleBlobV3(needleId uint64, data []byte) ([]byte, int32) {
|
||||||
|
// Overhead: DataSize(4) + Data(N) + Flags(1)
|
||||||
|
size := 4 + len(data) + 1
|
||||||
|
|
||||||
|
// Create struct to calculate padding
|
||||||
|
// Padding is based on the Size field (DataSize + Overhead)
|
||||||
|
padding := needle.PaddingLength(types.Size(size), needle.Version3)
|
||||||
|
|
||||||
|
blob := make([]byte, 16+size+4+8+int(padding)) // Header(16) + Body(Size) + Checksum(4) + Timestamp(8) + Padding
|
||||||
|
|
||||||
|
// Header
|
||||||
|
util.Uint32toBytes(blob[0:4], 0x12345678) // Cookie
|
||||||
|
util.Uint64toBytes(blob[4:12], needleId)
|
||||||
|
util.Uint32toBytes(blob[12:16], uint32(size))
|
||||||
|
|
||||||
|
// Body
|
||||||
|
util.Uint32toBytes(blob[16:20], uint32(len(data)))
|
||||||
|
copy(blob[20:], data)
|
||||||
|
blob[20+len(data)] = 0 // Flags
|
||||||
|
|
||||||
|
// Checksum
|
||||||
|
crc := needle.NewCRC(data)
|
||||||
|
util.Uint32toBytes(blob[20+len(data)+1:20+len(data)+1+4], crc.Value())
|
||||||
|
|
||||||
|
// Timestamp (space reserved, filled by server)
|
||||||
|
// Padding (zeroes)
|
||||||
|
|
||||||
|
return blob, int32(size)
|
||||||
|
}
|
||||||
|
|
||||||
// uploadTestDataWithReplication uploads test data and returns the volume ID
|
// uploadTestDataWithReplication uploads test data and returns the volume ID
|
||||||
// using the specified collection and replication level. All files are uploaded to the same volume.
|
// using the specified collection and replication level. All files are uploaded to the same volume.
|
||||||
func uploadTestDataWithReplication(t *testing.T, masterAddr string, collection string, replication string) (needle.VolumeId, error) {
|
func uploadTestDataWithReplication(t *testing.T, masterAddr string, collection string, replication string) (needle.VolumeId, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user