From 1e13b6879cb476b07255f59dbfa97a3e30fad698 Mon Sep 17 00:00:00 2001 From: dsd <60881537+dsd2077@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:20:50 +0800 Subject: [PATCH] fix(volume): to avoid duplicate write a same needle (#6138) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix WriteNeedleBlob to avoid duplicate write a same needle Co-authored-by: 邓书东 Co-authored-by: Chris Lu --- weed/storage/volume_write.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index 8b731e028..434e296d4 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -4,11 +4,12 @@ import ( "bytes" "errors" "fmt" + "os" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/needle" . "github.com/seaweedfs/seaweedfs/weed/storage/types" - "os" ) var ErrorNotFound = errors.New("not found") @@ -323,6 +324,19 @@ func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) } + nv, ok := v.nm.Get(needleId) + if ok && nv.Size == size { + oldNeedle := new(needle.Needle) + err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version()) + if err == nil { + newNeedle := new(needle.Needle) + err = newNeedle.ReadBytes(needleBlob, nv.Offset.ToActualOffset(), size, v.Version()) + if err == nil && oldNeedle.Cookie == newNeedle.Cookie && oldNeedle.Checksum == newNeedle.Checksum && bytes.Equal(oldNeedle.Data, newNeedle.Data) { + glog.V(0).Infof("needle %v already exists", needleId) + return nil + } + } + } appendAtNs := needle.GetAppendAtNs(v.lastAppendAtNs) offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())