volume: atomic copying file, adds version and stopOffset
This commit is contained in:
@@ -52,9 +52,13 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
return fmt.Errorf("read volume file status failed, %v", err)
|
||||
}
|
||||
|
||||
// println("source:", volFileInfoResp.String())
|
||||
|
||||
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: req.VolumeId,
|
||||
IsIdxFile: true,
|
||||
VolumeId: req.VolumeId,
|
||||
IsIdxFile: true,
|
||||
CompactionRevision: volFileInfoResp.CompactionRevision,
|
||||
StopOffset: volFileInfoResp.IdxFileSize,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
|
||||
@@ -66,8 +70,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
}
|
||||
|
||||
copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: req.VolumeId,
|
||||
IsDatFile: true,
|
||||
VolumeId: req.VolumeId,
|
||||
IsDatFile: true,
|
||||
CompactionRevision: volFileInfoResp.CompactionRevision,
|
||||
StopOffset: volFileInfoResp.DatFileSize,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
|
||||
@@ -97,7 +103,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
}
|
||||
|
||||
return &volume_server_pb.VolumeCopyResponse{
|
||||
LastAppendAtNs:volFileInfoResp.DatFileTimestampSeconds*uint64(time.Second),
|
||||
LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
|
||||
}, err
|
||||
}
|
||||
|
||||
@@ -161,6 +167,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
|
||||
resp.DatFileTimestampSeconds = uint64(modTime.Unix())
|
||||
resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
|
||||
resp.FileCount = v.FileCount()
|
||||
resp.CompactionRevision = uint32(v.CompactionRevision)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -171,7 +178,13 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
|
||||
return fmt.Errorf("not found volume id %d", req.VolumeId)
|
||||
}
|
||||
|
||||
const BufferSize = 1024 * 16
|
||||
if uint32(v.CompactionRevision) != req.CompactionRevision {
|
||||
return fmt.Errorf("volume %d is compacted", req.VolumeId)
|
||||
}
|
||||
|
||||
bytesToRead := int64(req.StopOffset)
|
||||
|
||||
const BufferSize = 1024 * 1024 * 2
|
||||
var fileName = v.FileName()
|
||||
if req.IsDatFile {
|
||||
fileName += ".dat"
|
||||
@@ -186,19 +199,31 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
|
||||
|
||||
buffer := make([]byte, BufferSize)
|
||||
|
||||
for {
|
||||
for bytesToRead > 0 {
|
||||
bytesread, err := file.Read(buffer)
|
||||
|
||||
// println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
|
||||
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
// println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
|
||||
break
|
||||
}
|
||||
|
||||
stream.Send(&volume_server_pb.CopyFileResponse{
|
||||
if int64(bytesread) > bytesToRead {
|
||||
bytesread = int(bytesToRead)
|
||||
}
|
||||
err = stream.Send(&volume_server_pb.CopyFileResponse{
|
||||
FileContent: buffer[:bytesread],
|
||||
})
|
||||
if err != nil {
|
||||
// println("sending", bytesread, "bytes err", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
bytesToRead -= int64(bytesread)
|
||||
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user