tail-volume-uses-the-source-volume-version
This commit is contained in:
@@ -54,6 +54,7 @@ func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.Dia
|
|||||||
|
|
||||||
needleHeader := resp.NeedleHeader
|
needleHeader := resp.NeedleHeader
|
||||||
needleBody := resp.NeedleBody
|
needleBody := resp.NeedleBody
|
||||||
|
version := needle.Version(resp.Version)
|
||||||
|
|
||||||
if len(needleHeader) == 0 {
|
if len(needleHeader) == 0 {
|
||||||
continue
|
continue
|
||||||
@@ -73,7 +74,7 @@ func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.Dia
|
|||||||
|
|
||||||
n := new(needle.Needle)
|
n := new(needle.Needle)
|
||||||
n.ParseNeedleHeader(needleHeader)
|
n.ParseNeedleHeader(needleHeader)
|
||||||
err = n.ReadNeedleBodyBytes(needleBody, needle.GetCurrentVersion())
|
err = n.ReadNeedleBodyBytes(needleBody, version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -341,6 +341,7 @@ message VolumeTailSenderResponse {
|
|||||||
bytes needle_header = 1;
|
bytes needle_header = 1;
|
||||||
bytes needle_body = 2;
|
bytes needle_body = 2;
|
||||||
bool is_last_chunk = 3;
|
bool is_last_chunk = 3;
|
||||||
|
uint32 version = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message VolumeTailReceiverRequest {
|
message VolumeTailReceiverRequest {
|
||||||
|
|||||||
@@ -2535,6 +2535,7 @@ type VolumeTailSenderResponse struct {
|
|||||||
NeedleHeader []byte `protobuf:"bytes,1,opt,name=needle_header,json=needleHeader,proto3" json:"needle_header,omitempty"`
|
NeedleHeader []byte `protobuf:"bytes,1,opt,name=needle_header,json=needleHeader,proto3" json:"needle_header,omitempty"`
|
||||||
NeedleBody []byte `protobuf:"bytes,2,opt,name=needle_body,json=needleBody,proto3" json:"needle_body,omitempty"`
|
NeedleBody []byte `protobuf:"bytes,2,opt,name=needle_body,json=needleBody,proto3" json:"needle_body,omitempty"`
|
||||||
IsLastChunk bool `protobuf:"varint,3,opt,name=is_last_chunk,json=isLastChunk,proto3" json:"is_last_chunk,omitempty"`
|
IsLastChunk bool `protobuf:"varint,3,opt,name=is_last_chunk,json=isLastChunk,proto3" json:"is_last_chunk,omitempty"`
|
||||||
|
Version uint32 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
|
||||||
unknownFields protoimpl.UnknownFields
|
unknownFields protoimpl.UnknownFields
|
||||||
sizeCache protoimpl.SizeCache
|
sizeCache protoimpl.SizeCache
|
||||||
}
|
}
|
||||||
@@ -2590,6 +2591,13 @@ func (x *VolumeTailSenderResponse) GetIsLastChunk() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *VolumeTailSenderResponse) GetVersion() uint32 {
|
||||||
|
if x != nil {
|
||||||
|
return x.Version
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
type VolumeTailReceiverRequest struct {
|
type VolumeTailReceiverRequest struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
|
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
|
||||||
@@ -5887,12 +5895,13 @@ const file_volume_server_proto_rawDesc = "" +
|
|||||||
"\x17VolumeTailSenderRequest\x12\x1b\n" +
|
"\x17VolumeTailSenderRequest\x12\x1b\n" +
|
||||||
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" +
|
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" +
|
||||||
"\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" +
|
"\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" +
|
||||||
"\x14idle_timeout_seconds\x18\x03 \x01(\rR\x12idleTimeoutSeconds\"\x84\x01\n" +
|
"\x14idle_timeout_seconds\x18\x03 \x01(\rR\x12idleTimeoutSeconds\"\x9e\x01\n" +
|
||||||
"\x18VolumeTailSenderResponse\x12#\n" +
|
"\x18VolumeTailSenderResponse\x12#\n" +
|
||||||
"\rneedle_header\x18\x01 \x01(\fR\fneedleHeader\x12\x1f\n" +
|
"\rneedle_header\x18\x01 \x01(\fR\fneedleHeader\x12\x1f\n" +
|
||||||
"\vneedle_body\x18\x02 \x01(\fR\n" +
|
"\vneedle_body\x18\x02 \x01(\fR\n" +
|
||||||
"needleBody\x12\"\n" +
|
"needleBody\x12\"\n" +
|
||||||
"\ris_last_chunk\x18\x03 \x01(\bR\visLastChunk\"\xb7\x01\n" +
|
"\ris_last_chunk\x18\x03 \x01(\bR\visLastChunk\x12\x18\n" +
|
||||||
|
"\aversion\x18\x04 \x01(\rR\aversion\"\xb7\x01\n" +
|
||||||
"\x19VolumeTailReceiverRequest\x12\x1b\n" +
|
"\x19VolumeTailReceiverRequest\x12\x1b\n" +
|
||||||
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" +
|
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" +
|
||||||
"\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" +
|
"\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" +
|
||||||
|
|||||||
@@ -3,9 +3,10 @@ package weed_server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||||
@@ -65,12 +66,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
|
|||||||
|
|
||||||
if isLastOne {
|
if isLastOne {
|
||||||
// need to heart beat to the client to ensure the connection health
|
// need to heart beat to the client to ensure the connection health
|
||||||
sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
|
sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true, Version: uint32(v.Version())})
|
||||||
return lastTimestampNs, sendErr
|
return lastTimestampNs, sendErr
|
||||||
}
|
}
|
||||||
|
|
||||||
scanner := &VolumeFileScanner4Tailing{
|
scanner := &VolumeFileScanner4Tailing{
|
||||||
stream: stream,
|
stream: stream,
|
||||||
|
version: uint32(v.Version()),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner)
|
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner)
|
||||||
@@ -101,6 +103,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
|
|||||||
type VolumeFileScanner4Tailing struct {
|
type VolumeFileScanner4Tailing struct {
|
||||||
stream volume_server_pb.VolumeServer_VolumeTailSenderServer
|
stream volume_server_pb.VolumeServer_VolumeTailSenderServer
|
||||||
lastProcessedTimestampNs uint64
|
lastProcessedTimestampNs uint64
|
||||||
|
version uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
|
func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
|
||||||
@@ -126,6 +129,7 @@ func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset i
|
|||||||
NeedleHeader: needleHeader,
|
NeedleHeader: needleHeader,
|
||||||
NeedleBody: needleBody[i:stopOffset],
|
NeedleBody: needleBody[i:stopOffset],
|
||||||
IsLastChunk: isLastChunk,
|
IsLastChunk: isLastChunk,
|
||||||
|
Version: scanner.version,
|
||||||
})
|
})
|
||||||
if sendErr != nil {
|
if sendErr != nil {
|
||||||
return sendErr
|
return sendErr
|
||||||
|
|||||||
Reference in New Issue
Block a user