refactor
This commit is contained in:
@@ -61,36 +61,17 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
|
|||||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc()
|
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc()
|
||||||
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
|
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
|
||||||
}
|
}
|
||||||
switch version {
|
if version == Version1 {
|
||||||
case Version1:
|
|
||||||
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
|
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
|
||||||
// fallthrough to checksum logic below
|
} else {
|
||||||
case Version2:
|
|
||||||
err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)])
|
err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)])
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case Version3:
|
|
||||||
err := n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(size)])
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
|
|
||||||
n.AppendAtNs = util.BytesToUint64(bytes[tsOffset : tsOffset+TimestampSize])
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unsupported version %d", version)
|
|
||||||
}
|
}
|
||||||
if size > 0 {
|
err = n.readNeedleTail(bytes[NeedleHeaderSize+size:], version)
|
||||||
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
|
if err != nil {
|
||||||
newChecksum := NewCRC(n.Data)
|
return err
|
||||||
if checksum != newChecksum.Value() && checksum != uint32(newChecksum) {
|
|
||||||
// the crc.Value() function is to be deprecated. this double checking is for backward compatibility
|
|
||||||
// with seaweed version using crc.Value() instead of uint32(crc), which appears in commit 056c480eb
|
|
||||||
// and switch appeared in version 3.09.
|
|
||||||
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc()
|
|
||||||
return errors.New("CRC error! Data On Disk Corrupted")
|
|
||||||
}
|
|
||||||
n.Checksum = newChecksum
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -260,14 +241,11 @@ func (n *Needle) ReadNeedleBodyBytes(needleBody []byte, version Version) (err er
|
|||||||
switch version {
|
switch version {
|
||||||
case Version1:
|
case Version1:
|
||||||
n.Data = needleBody[:n.Size]
|
n.Data = needleBody[:n.Size]
|
||||||
n.Checksum = NewCRC(n.Data)
|
err = n.readNeedleTail(needleBody[n.Size:], version)
|
||||||
case Version2, Version3:
|
case Version2, Version3:
|
||||||
err = n.readNeedleDataVersion2(needleBody[0:n.Size])
|
err = n.readNeedleDataVersion2(needleBody[0:n.Size])
|
||||||
n.Checksum = NewCRC(n.Data)
|
if err == nil {
|
||||||
|
err = n.readNeedleTail(needleBody[n.Size:], version)
|
||||||
if version == Version3 {
|
|
||||||
tsOffset := n.Size + NeedleChecksumSize
|
|
||||||
n.AppendAtNs = util.BytesToUint64(needleBody[tsOffset : tsOffset+TimestampSize])
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("unsupported version %d!", version)
|
err = fmt.Errorf("unsupported version %d!", version)
|
||||||
|
|||||||
@@ -2,11 +2,12 @@ package needle
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
"io"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadNeedleData uses a needle without n.Data to read the content
|
// ReadNeedleData uses a needle without n.Data to read the content
|
||||||
@@ -78,10 +79,7 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
|
|||||||
index, err = n.readNeedleDataVersion2NonData(metaSlice)
|
index, err = n.readNeedleDataVersion2NonData(metaSlice)
|
||||||
}
|
}
|
||||||
|
|
||||||
n.Checksum = CRC(util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize]))
|
err = n.readNeedleTail(metaSlice[index:], version)
|
||||||
if version == Version3 {
|
|
||||||
n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
35
weed/storage/needle/needle_read_tail.go
Normal file
35
weed/storage/needle/needle_read_tail.go
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
package needle
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||||
|
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (n *Needle) readNeedleTail(needleBody []byte, version Version) error {
|
||||||
|
|
||||||
|
// for all versions, we need to read the checksum
|
||||||
|
if len(n.Data) > 0 {
|
||||||
|
expectedChecksum := CRC(util.BytesToUint32(needleBody[0:NeedleChecksumSize]))
|
||||||
|
dataChecksum := NewCRC(n.Data)
|
||||||
|
if expectedChecksum != dataChecksum {
|
||||||
|
// the crc.Value() function is to be deprecated. this double checking is for backward compatibility
|
||||||
|
// with seaweed version using crc.Value() instead of uint32(crc), which appears in commit 056c480eb
|
||||||
|
// and switch appeared in version 3.09.
|
||||||
|
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc()
|
||||||
|
return errors.New("CRC error! Data On Disk Corrupted")
|
||||||
|
}
|
||||||
|
n.Checksum = dataChecksum
|
||||||
|
} else {
|
||||||
|
// when data is skipped from reading, just read the checksum
|
||||||
|
n.Checksum = CRC(util.BytesToUint32(needleBody[0:NeedleChecksumSize]))
|
||||||
|
}
|
||||||
|
|
||||||
|
if version == Version3 {
|
||||||
|
tsOffset := NeedleChecksumSize
|
||||||
|
n.AppendAtNs = util.BytesToUint64(needleBody[tsOffset : tsOffset+TimestampSize])
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user