add signatures to messages to avoid double processing
This commit is contained in:
@@ -36,6 +36,7 @@ type Filer struct {
|
||||
metaLogCollection string
|
||||
metaLogReplication string
|
||||
MetaAggregator *MetaAggregator
|
||||
Signature int32
|
||||
}
|
||||
|
||||
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
|
||||
@@ -44,6 +45,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
|
||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
|
||||
fileIdDeletionQueue: util.NewUnboundedQueue(),
|
||||
GrpcDialOption: grpcDialOption,
|
||||
Signature: util.RandomInt32(),
|
||||
}
|
||||
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
|
||||
f.metaLogCollection = collection
|
||||
@@ -93,7 +95,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
|
||||
return f.Store.RollbackTransaction(ctx)
|
||||
}
|
||||
|
||||
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool) error {
|
||||
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
|
||||
|
||||
if string(entry.FullPath) == "/" {
|
||||
return nil
|
||||
@@ -143,7 +145,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
||||
}
|
||||
} else {
|
||||
f.maybeAddBucket(dirEntry)
|
||||
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster)
|
||||
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
|
||||
}
|
||||
|
||||
} else if !dirEntry.IsDirectory() {
|
||||
@@ -191,7 +193,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
||||
}
|
||||
|
||||
f.maybeAddBucket(entry)
|
||||
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster)
|
||||
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
|
||||
|
||||
f.deleteChunksIfNotNew(oldEntry, entry)
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (err error) {
|
||||
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
|
||||
if p == "/" {
|
||||
return nil
|
||||
}
|
||||
@@ -36,7 +36,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
|
||||
}
|
||||
|
||||
// delete the file or folder
|
||||
err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster)
|
||||
err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete file %s: %v", p, err)
|
||||
}
|
||||
@@ -76,7 +76,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
||||
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
|
||||
chunks = append(chunks, dirChunks...)
|
||||
} else {
|
||||
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster)
|
||||
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
|
||||
chunks = append(chunks, sub.Chunks...)
|
||||
}
|
||||
if err != nil && !ignoreRecursiveError {
|
||||
@@ -95,12 +95,12 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
||||
return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
||||
}
|
||||
|
||||
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster)
|
||||
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil)
|
||||
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool) (err error) {
|
||||
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
|
||||
|
||||
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
|
||||
|
||||
@@ -108,7 +108,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
|
||||
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
|
||||
}
|
||||
if !entry.IsDirectory() {
|
||||
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster)
|
||||
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool) {
|
||||
func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) {
|
||||
var fullpath string
|
||||
if oldEntry != nil {
|
||||
fullpath = string(oldEntry.FullPath)
|
||||
@@ -41,6 +41,7 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
|
||||
DeleteChunks: deleteChunks,
|
||||
NewParentPath: newParentPath,
|
||||
IsFromOtherCluster: isFromOtherCluster,
|
||||
Signatures: append(signatures, f.Signature),
|
||||
}
|
||||
|
||||
if notification.Queue != nil {
|
||||
|
||||
@@ -41,7 +41,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
|
||||
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
|
||||
|
||||
// update the entry
|
||||
err = f.CreateEntry(context.Background(), entry, false, false)
|
||||
err = f.CreateEntry(context.Background(), entry, false, false, nil)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := filer.CreateEntry(ctx, entry1, false, false); err != nil {
|
||||
if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := filer.CreateEntry(ctx, entry1, false, false); err != nil {
|
||||
if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user