implement elastic changes

This commit is contained in:
Chris Lu
2021-01-14 22:42:25 -08:00
parent 893cbc8482
commit 5d4568b91f

View File

@@ -96,8 +96,8 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil return nil
} }
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
return nil, filer.ErrUnsupportedListDirectoryPrefixed return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
@@ -187,7 +187,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
} }
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { if entries, _, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
for _, entry := range entries { for _, entry := range entries {
store.DeleteEntry(ctx, entry.FullPath) store.DeleteEntry(ctx, entry.FullPath)
} }
@@ -195,20 +195,18 @@ func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil return nil
} }
func (store *ElasticStore) ListDirectoryEntries( func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int) (entries []*filer.Entry, hasMore bool, err error) {
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, if string(dirPath) == "/" {
) (entries []*filer.Entry, err error) { return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit)
if string(fullpath) == "/" {
return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
} }
return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit) return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
} }
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, hasMore bool, err error) {
indexResult, err := store.client.CatIndices().Do(ctx) indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil { if err != nil {
glog.Errorf("list indices %v.", err) glog.Errorf("list indices %v.", err)
return entries, err return entries, false, err
} }
for _, index := range indexResult { for _, index := range indexResult {
if index.Index == indexKV { if index.Index == indexKV {
@@ -223,18 +221,19 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi
} }
limit-- limit--
if limit < 0 { if limit < 0 {
hasMore = true
break break
} }
entries = append(entries, entry) entries = append(entries, entry)
} }
} }
} }
return entries, nil return entries, hasMore, nil
} }
func (store *ElasticStore) listDirectoryEntries( func (store *ElasticStore) listDirectoryEntries(
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
) (entries []*filer.Entry, err error) { ) (entries []*filer.Entry, hasMore bool, err error) {
first := true first := true
index := getIndex(fullpath) index := getIndex(fullpath)
nextStart := "" nextStart := ""
@@ -242,7 +241,7 @@ func (store *ElasticStore) listDirectoryEntries(
if _, err := store.client.Refresh(index).Do(ctx); err != nil { if _, err := store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) { if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(ctx) store.client.CreateIndex(index).Do(ctx)
return entries, nil return entries, hasMore, nil
} }
} }
for { for {
@@ -250,7 +249,7 @@ func (store *ElasticStore) listDirectoryEntries(
if (startFileName == "" && first) || inclusive { if (startFileName == "" && first) || inclusive {
if result, err = store.search(ctx, index, parentId); err != nil { if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, err return entries, hasMore, err
} }
} else { } else {
fullPath := string(fullpath) + "/" + startFileName fullPath := string(fullpath) + "/" + startFileName
@@ -260,7 +259,7 @@ func (store *ElasticStore) listDirectoryEntries(
after := weed_util.Md5String([]byte(fullPath)) after := weed_util.Md5String([]byte(fullPath))
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, err return entries, hasMore, err
} }
} }
first = false first = false
@@ -272,7 +271,8 @@ func (store *ElasticStore) listDirectoryEntries(
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil { if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
limit-- limit--
if limit < 0 { if limit < 0 {
return entries, nil hasMore = true
return entries, hasMore, nil
} }
nextStart = string(esEntry.Entry.FullPath) nextStart = string(esEntry.Entry.FullPath)
fileName := getFileName(esEntry.Entry.FullPath) fileName := getFileName(esEntry.Entry.FullPath)
@@ -286,7 +286,7 @@ func (store *ElasticStore) listDirectoryEntries(
break break
} }
} }
return entries, nil return entries, hasMore, nil
} }
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) { func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {