filer: support active<=>active filer replication

This commit is contained in:
Chris Lu
2020-06-30 22:53:53 -07:00
parent 7be57a1504
commit 31e23e9783
21 changed files with 369 additions and 313 deletions

View File

@@ -84,7 +84,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.store.RollbackTransaction(ctx)
}
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) error {
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool) error {
if string(entry.FullPath) == "/" {
return nil
@@ -141,7 +141,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
}
} else {
f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(ctx, nil, dirEntry, false)
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster)
}
} else if !dirEntry.IsDirectory() {
@@ -192,7 +192,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
}
f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster)
f.deleteChunksIfNotNew(oldEntry, entry)

View File

@@ -10,7 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (err error) {
if p == "/" {
return nil
}
@@ -27,7 +27,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
if entry.IsDirectory() {
// delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection)
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster)
if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
@@ -36,7 +36,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
}
// delete the file or folder
err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster)
if err != nil {
return fmt.Errorf("delete file %s: %v", p, err)
}
@@ -53,7 +53,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return nil
}
func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) {
lastFileName := ""
includeLastFile := false
@@ -72,9 +72,9 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
lastFileName = sub.Name()
var dirChunks []*filer_pb.FileChunk
if sub.IsDirectory() {
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
f.cacheDelDirectory(string(sub.FullPath))
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks)
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster)
chunks = append(chunks, dirChunks...)
} else {
chunks = append(chunks, sub.Chunks...)
@@ -98,7 +98,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
return chunks, nil
}
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool) (err error) {
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
@@ -108,7 +108,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
if entry.IsDirectory() {
f.cacheDelDirectory(string(entry.FullPath))
}
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks)
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster)
return nil
}

View File

@@ -15,7 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks bool) {
func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool) {
var fullpath string
if oldEntry != nil {
fullpath = string(oldEntry.FullPath)
@@ -43,8 +43,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
}
if notification.Queue != nil {
glog.V(3).Infof("notifying entry update %v", fullpath)
notification.Queue.SendMessage(fullpath, eventNotification)
if !isFromOtherCluster {
glog.V(3).Infof("notifying entry update %v", fullpath)
notification.Queue.SendMessage(fullpath, eventNotification)
}
}
f.logMetaEvent(ctx, fullpath, eventNotification)

View File

@@ -41,7 +41,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
// update the entry
err = f.CreateEntry(context.Background(), entry, false)
err = f.CreateEntry(context.Background(), entry, false, false)
return err
}

View File

@@ -32,7 +32,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
if err := filer.CreateEntry(ctx, entry1, false); err != nil {
if err := filer.CreateEntry(ctx, entry1, false, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}

View File

@@ -32,7 +32,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
if err := filer.CreateEntry(ctx, entry1, false); err != nil {
if err := filer.CreateEntry(ctx, entry1, false, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}