1.split kv in one file.
2.disable query for kv in es index.
This commit is contained in:
@@ -16,9 +16,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
indexType = "_doc"
|
indexType = "_doc"
|
||||||
indexPrefix = ".seaweedfs_"
|
indexPrefix = ".seaweedfs_"
|
||||||
indexKV = ".seaweedfs_kv_entries"
|
indexKV = ".seaweedfs_kv_entries"
|
||||||
|
mappingWithoutQuery = ` {
|
||||||
|
"mappings": {
|
||||||
|
"enabled": false
|
||||||
|
}
|
||||||
|
}`
|
||||||
)
|
)
|
||||||
|
|
||||||
type ESEntry struct {
|
type ESEntry struct {
|
||||||
@@ -26,6 +31,10 @@ type ESEntry struct {
|
|||||||
Entry *filer.Entry
|
Entry *filer.Entry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ESKVEntry struct {
|
||||||
|
Value string `json:Value`
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
filer.Stores = append(filer.Stores, &ElasticStore{})
|
filer.Stores = append(filer.Stores, &ElasticStore{})
|
||||||
}
|
}
|
||||||
@@ -35,11 +44,6 @@ type ElasticStore struct {
|
|||||||
maxPageSize int
|
maxPageSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
type ESKVEntry struct {
|
|
||||||
Key string `json:Key`
|
|
||||||
Value string `json:Value`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (store *ElasticStore) GetName() string {
|
func (store *ElasticStore) GetName() string {
|
||||||
return "elastic7"
|
return "elastic7"
|
||||||
}
|
}
|
||||||
@@ -61,6 +65,12 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("init elastic %s: %v.", servers, err)
|
return fmt.Errorf("init elastic %s: %v.", servers, err)
|
||||||
}
|
}
|
||||||
|
if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
|
||||||
|
_, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create index(%s) %v.", indexKV, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
||||||
@@ -72,66 +82,6 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
|
|||||||
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
|
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
|
|
||||||
id := fmt.Sprintf("%x", md5.Sum(key))
|
|
||||||
deleteResult, err := store.client.Delete().
|
|
||||||
Index(indexKV).
|
|
||||||
Type(indexType).
|
|
||||||
Id(id).
|
|
||||||
Do(context.Background())
|
|
||||||
if err == nil {
|
|
||||||
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
glog.Errorf("delete key(id:%s) %v.", string(key), err)
|
|
||||||
return fmt.Errorf("delete key %v.", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
|
|
||||||
id := fmt.Sprintf("%x", md5.Sum(key))
|
|
||||||
searchResult, err := store.client.Get().
|
|
||||||
Index(indexKV).
|
|
||||||
Type(indexType).
|
|
||||||
Id(id).
|
|
||||||
Do(context.Background())
|
|
||||||
if elastic.IsNotFound(err) {
|
|
||||||
return nil, filer_pb.ErrNotFound
|
|
||||||
}
|
|
||||||
if searchResult != nil && searchResult.Found {
|
|
||||||
esEntry := &ESKVEntry{}
|
|
||||||
if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
|
|
||||||
return []byte(esEntry.Value), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
glog.Errorf("find key(%s),%v.", string(key), err)
|
|
||||||
return nil, filer_pb.ErrNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
|
|
||||||
id := fmt.Sprintf("%x", md5.Sum(key))
|
|
||||||
esEntry := &ESKVEntry{
|
|
||||||
string(key),
|
|
||||||
string(value),
|
|
||||||
}
|
|
||||||
val, err := jsoniter.Marshal(esEntry)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("insert key(%s) %v.", string(key), err)
|
|
||||||
return fmt.Errorf("insert key %v.", err)
|
|
||||||
}
|
|
||||||
_, err = store.client.Index().
|
|
||||||
Index(indexKV).
|
|
||||||
Type(indexType).
|
|
||||||
Id(id).
|
|
||||||
BodyJson(string(val)).
|
|
||||||
Do(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("kv put: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
|
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
|
||||||
return nil, filer.ErrUnsupportedListDirectoryPrefixed
|
return nil, filer.ErrUnsupportedListDirectoryPrefixed
|
||||||
}
|
}
|
||||||
@@ -154,7 +104,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry)
|
|||||||
Type(indexType).
|
Type(indexType).
|
||||||
Id(id).
|
Id(id).
|
||||||
BodyJson(string(value)).
|
BodyJson(string(value)).
|
||||||
Do(context.Background())
|
Do(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
|
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
|
||||||
return fmt.Errorf("insert entry %v.", err)
|
return fmt.Errorf("insert entry %v.", err)
|
||||||
@@ -171,7 +121,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
|
|||||||
Index(index).
|
Index(index).
|
||||||
Type(indexType).
|
Type(indexType).
|
||||||
Id(id).
|
Id(id).
|
||||||
Do(context.Background())
|
Do(ctx)
|
||||||
if elastic.IsNotFound(err) {
|
if elastic.IsNotFound(err) {
|
||||||
return nil, filer_pb.ErrNotFound
|
return nil, filer_pb.ErrNotFound
|
||||||
}
|
}
|
||||||
@@ -190,24 +140,24 @@ func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
|
|||||||
index := getIndex(fullpath)
|
index := getIndex(fullpath)
|
||||||
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
|
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
|
||||||
if strings.Count(string(fullpath), "/") == 1 {
|
if strings.Count(string(fullpath), "/") == 1 {
|
||||||
return store.deleteIndex(index)
|
return store.deleteIndex(ctx, index)
|
||||||
}
|
}
|
||||||
return store.deleteEntry(index, id)
|
return store.deleteEntry(ctx, index, id)
|
||||||
}
|
}
|
||||||
func (store *ElasticStore) deleteIndex(index string) (err error) {
|
func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
|
||||||
deleteResult, err := store.client.DeleteIndex(index).Do(context.Background())
|
deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
|
||||||
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
|
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
glog.Errorf("delete index(%s) %v.", index, err)
|
glog.Errorf("delete index(%s) %v.", index, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
func (store *ElasticStore) deleteEntry(index, id string) (err error) {
|
func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
|
||||||
deleteResult, err := store.client.Delete().
|
deleteResult, err := store.client.Delete().
|
||||||
Index(index).
|
Index(index).
|
||||||
Type(indexType).
|
Type(indexType).
|
||||||
Id(id).
|
Id(id).
|
||||||
Do(context.Background())
|
Do(ctx)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
|
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
|
||||||
return nil
|
return nil
|
||||||
@@ -235,7 +185,7 @@ func (store *ElasticStore) ListDirectoryEntries(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
|
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
|
||||||
indexResult, err := store.client.CatIndices().Do(context.Background())
|
indexResult, err := store.client.CatIndices().Do(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("list indices %v.", err)
|
glog.Errorf("list indices %v.", err)
|
||||||
return entries, err
|
return entries, err
|
||||||
@@ -266,16 +216,16 @@ func (store *ElasticStore) listDirectoryEntries(
|
|||||||
index := getIndex(fullpath)
|
index := getIndex(fullpath)
|
||||||
nextStart := ""
|
nextStart := ""
|
||||||
parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
|
parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
|
||||||
if _, err := store.client.Refresh(index).Do(context.Background()); err != nil {
|
if _, err := store.client.Refresh(index).Do(ctx); err != nil {
|
||||||
if elastic.IsNotFound(err) {
|
if elastic.IsNotFound(err) {
|
||||||
store.client.CreateIndex(index).Do(context.Background())
|
store.client.CreateIndex(index).Do(ctx)
|
||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
result := &elastic.SearchResult{}
|
result := &elastic.SearchResult{}
|
||||||
if (startFileName == "" && first) || inclusive {
|
if (startFileName == "" && first) || inclusive {
|
||||||
if result, err = store.search(index, parentId); err != nil {
|
if result, err = store.search(ctx, index, parentId); err != nil {
|
||||||
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
@@ -285,7 +235,7 @@ func (store *ElasticStore) listDirectoryEntries(
|
|||||||
fullPath = nextStart
|
fullPath = nextStart
|
||||||
}
|
}
|
||||||
after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
|
after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
|
||||||
if result, err = store.searchAfter(index, parentId, after); err != nil {
|
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
|
||||||
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
|
||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
@@ -316,8 +266,8 @@ func (store *ElasticStore) listDirectoryEntries(
|
|||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) {
|
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
|
||||||
if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 {
|
if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
|
||||||
return &elastic.SearchResult{
|
return &elastic.SearchResult{
|
||||||
Hits: &elastic.SearchHits{
|
Hits: &elastic.SearchHits{
|
||||||
Hits: make([]*elastic.SearchHit, 0)},
|
Hits: make([]*elastic.SearchHit, 0)},
|
||||||
@@ -328,18 +278,18 @@ func (store *ElasticStore) search(index, parentId string) (result *elastic.Searc
|
|||||||
Query(elastic.NewMatchQuery("ParentId", parentId)).
|
Query(elastic.NewMatchQuery("ParentId", parentId)).
|
||||||
Size(store.maxPageSize).
|
Size(store.maxPageSize).
|
||||||
Sort("_id", false).
|
Sort("_id", false).
|
||||||
Do(context.Background())
|
Do(ctx)
|
||||||
return queryResult, err
|
return queryResult, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) {
|
func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
|
||||||
queryResult, err := store.client.Search().
|
queryResult, err := store.client.Search().
|
||||||
Index(index).
|
Index(index).
|
||||||
Query(elastic.NewMatchQuery("ParentId", parentId)).
|
Query(elastic.NewMatchQuery("ParentId", parentId)).
|
||||||
SearchAfter(after).
|
SearchAfter(after).
|
||||||
Size(store.maxPageSize).
|
Size(store.maxPageSize).
|
||||||
Sort("_id", false).
|
Sort("_id", false).
|
||||||
Do(context.Background())
|
Do(ctx)
|
||||||
return queryResult, err
|
return queryResult, err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
64
weed/filer/elastic/v7/elastic_store_kv.go
Normal file
64
weed/filer/elastic/v7/elastic_store_kv.go
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
package elastic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
jsoniter "github.com/json-iterator/go"
|
||||||
|
elastic "github.com/olivere/elastic/v7"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
|
||||||
|
deleteResult, err := store.client.Delete().
|
||||||
|
Index(indexKV).
|
||||||
|
Type(indexType).
|
||||||
|
Id(string(key)).
|
||||||
|
Do(ctx)
|
||||||
|
if err == nil {
|
||||||
|
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.Errorf("delete key(id:%s) %v.", string(key), err)
|
||||||
|
return fmt.Errorf("delete key %v.", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
|
||||||
|
searchResult, err := store.client.Get().
|
||||||
|
Index(indexKV).
|
||||||
|
Type(indexType).
|
||||||
|
Id(string(key)).
|
||||||
|
Do(ctx)
|
||||||
|
if elastic.IsNotFound(err) {
|
||||||
|
return nil, filer_pb.ErrNotFound
|
||||||
|
}
|
||||||
|
if searchResult != nil && searchResult.Found {
|
||||||
|
esEntry := &ESKVEntry{}
|
||||||
|
if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
|
||||||
|
return []byte(esEntry.Value), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.Errorf("find key(%s),%v.", string(key), err)
|
||||||
|
return nil, filer_pb.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
|
||||||
|
esEntry := &ESKVEntry{string(value)}
|
||||||
|
val, err := jsoniter.Marshal(esEntry)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("insert key(%s) %v.", string(key), err)
|
||||||
|
return fmt.Errorf("insert key %v.", err)
|
||||||
|
}
|
||||||
|
_, err = store.client.Index().
|
||||||
|
Index(indexKV).
|
||||||
|
Type(indexType).
|
||||||
|
Id(string(key)).
|
||||||
|
BodyJson(string(val)).
|
||||||
|
Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("kv put: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user