1. no locks for all read operations! Switching to pread for all reads.
2. prevent heartbeat lost when vacuuming, by removing locks on Size() function
This commit is contained in:
@@ -121,11 +121,11 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
|
||||
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
||||
}
|
||||
|
||||
func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err error) {
|
||||
func (n *Needle) Read(r *os.File, offset int64, size uint32, version Version) (ret int, err error) {
|
||||
switch version {
|
||||
case Version1:
|
||||
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
|
||||
if ret, err = r.Read(bytes); err != nil {
|
||||
if ret, err = r.ReadAt(bytes, offset); err != nil {
|
||||
return
|
||||
}
|
||||
n.readNeedleHeader(bytes)
|
||||
@@ -142,7 +142,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err e
|
||||
return 0, nil
|
||||
}
|
||||
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
|
||||
if ret, err = r.Read(bytes); err != nil {
|
||||
if ret, err = r.ReadAt(bytes, offset); err != nil {
|
||||
return
|
||||
}
|
||||
if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) {
|
||||
@@ -196,12 +196,12 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32, err error) {
|
||||
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
|
||||
n = new(Needle)
|
||||
if version == Version1 || version == Version2 {
|
||||
bytes := make([]byte, NeedleHeaderSize)
|
||||
var count int
|
||||
count, err = r.Read(bytes)
|
||||
count, err = r.ReadAt(bytes, offset)
|
||||
if count <= 0 || err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
@@ -214,21 +214,21 @@ func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32
|
||||
|
||||
//n should be a needle already read the header
|
||||
//the input stream will read until next file entry
|
||||
func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) (err error) {
|
||||
func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) {
|
||||
if bodyLength <= 0 {
|
||||
return nil
|
||||
}
|
||||
switch version {
|
||||
case Version1:
|
||||
bytes := make([]byte, bodyLength)
|
||||
if _, err = r.Read(bytes); err != nil {
|
||||
if _, err = r.ReadAt(bytes, offset); err != nil {
|
||||
return
|
||||
}
|
||||
n.Data = bytes[:n.Size]
|
||||
n.Checksum = NewCRC(n.Data)
|
||||
case Version2:
|
||||
bytes := make([]byte, bodyLength)
|
||||
if _, err = r.Read(bytes); err != nil {
|
||||
if _, err = r.ReadAt(bytes, offset); err != nil {
|
||||
return
|
||||
}
|
||||
n.readNeedleDataVersion2(bytes[0:n.Size])
|
||||
|
||||
Reference in New Issue
Block a user