This commit is contained in:
chrislu
2022-05-06 01:51:28 -07:00
31 changed files with 771 additions and 27 deletions

View File

@@ -32,4 +32,5 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/ydb"
)

View File

@@ -295,6 +295,19 @@ password=""
# skip tls cert validation
insecure_skip_verify = true
[ydb] # https://ydb.tech/
enabled = false
dsn = "grpc://localhost:2136?database=/local"
prefix = "seaweedfs"
useBucketPrefix = true # Fast Bucket Deletion
poolSizeLimit = 50
dialTimeOut = 10
# Authenticate produced with one of next environment variables:
# YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=<path/to/sa_key_file> — used service account key file by path
# YDB_ANONYMOUS_CREDENTIALS="1" — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation
# YDB_METADATA_CREDENTIALS="1" — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function
# YDB_ACCESS_TOKEN_CREDENTIALS=<access_token> — used for authenticate to YDB with short-life access token. For example, access token may be IAM token
##########################
##########################

View File

@@ -156,7 +156,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}

View File

@@ -18,7 +18,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
return fmt.Errorf("findDB: %v", err)
}
dirStr, dirHash, name := genDirAndName(key)
dirStr, dirHash, name := GenDirAndName(key)
res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value)
if err == nil {
@@ -53,7 +53,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b
return nil, fmt.Errorf("findDB: %v", err)
}
dirStr, dirHash, name := genDirAndName(key)
dirStr, dirHash, name := GenDirAndName(key)
row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr)
err = row.Scan(&value)
@@ -76,7 +76,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er
return fmt.Errorf("findDB: %v", err)
}
dirStr, dirHash, name := genDirAndName(key)
dirStr, dirHash, name := GenDirAndName(key)
res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr)
if err != nil {
@@ -92,7 +92,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er
}
func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
func GenDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
for len(key) < 8 {
key = append(key, 0)
}

View File

@@ -157,7 +157,7 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
model := &Model{
@@ -196,7 +196,7 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
model := &Model{

View File

@@ -100,7 +100,7 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}

View File

@@ -82,7 +82,7 @@ func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (er
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = weed_util.MaybeGzipData(meta)
}

View File

@@ -7,6 +7,8 @@ import (
"io"
)
const CountEntryChunksForGzip = 50
var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")

View File

@@ -75,7 +75,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}

View File

@@ -86,7 +86,7 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}

View File

@@ -88,7 +88,7 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}

View File

@@ -177,7 +177,7 @@ func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}

View File

@@ -107,7 +107,7 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}

View File

@@ -40,7 +40,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}

View File

@@ -52,7 +52,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}

View File

@@ -40,7 +40,7 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}

View File

@@ -53,7 +53,7 @@ func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *fil
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}

9
weed/filer/ydb/doc.go Normal file
View File

@@ -0,0 +1,9 @@
/*
Package ydb is for YDB filer store.
The referenced "github.com/ydb-platform/ydb-go-sdk/v3" library is too big when compiled.
So this is only compiled in "make full_install".
*/
package ydb

27
weed/filer/ydb/readme.md Normal file
View File

@@ -0,0 +1,27 @@
## YDB
database: https://github.com/ydb-platform/ydb
go driver: https://github.com/ydb-platform/ydb-go-sdk
options:
```
[ydb]
enabled=true
dsn=grpcs://ydb-ru.yandex.net:2135/?database=/ru/home/username/db
prefix="seaweedfs"
useBucketPrefix=true
poolSizeLimit=50
dialTimeOut = 10
```
Authenticate produced with one of next environment variables:
* `YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=<path/to/sa_key_file>` — used service account key file by path
* `YDB_ANONYMOUS_CREDENTIALS="1"` — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation
* `YDB_METADATA_CREDENTIALS="1"` — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function
* `YDB_ACCESS_TOKEN_CREDENTIALS=<access_token>` — used for authenticate to YDB with short-life access token. For example, access token may be IAM token
* `YDB_CONNECTION_STRING="grpcs://endpoint/?database=database"`
* i test using this dev database:
`make dev_ydb`

View File

@@ -0,0 +1,85 @@
//go:build ydb
// +build ydb
package ydb
import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
const (
insertQuery = `
PRAGMA TablePathPrefix("%v");
DECLARE $dir_hash AS int64;
DECLARE $directory AS Utf8;
DECLARE $name AS Utf8;
DECLARE $meta AS String;
DECLARE $expire_at AS Optional<uint32>;
UPSERT INTO ` + asql.DEFAULT_TABLE + `
(dir_hash, name, directory, meta, expire_at)
VALUES
($dir_hash, $name, $directory, $meta, $expire_at);`
updateQuery = `
PRAGMA TablePathPrefix("%v");
DECLARE $dir_hash AS int64;
DECLARE $directory AS Utf8;
DECLARE $name AS Utf8;
DECLARE $meta AS String;
DECLARE $expire_at AS Optional<uint32>;
REPLACE INTO ` + asql.DEFAULT_TABLE + `
(dir_hash, name, directory, meta, expire_at)
VALUES
($dir_hash, $name, $directory, $meta, $expire_at);`
deleteQuery = `
PRAGMA TablePathPrefix("%v");
DECLARE $dir_hash AS int64;
DECLARE $name AS Utf8;
DELETE FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash = $dir_hash AND name = $name;`
findQuery = `
PRAGMA TablePathPrefix("%v");
DECLARE $dir_hash AS int64;
DECLARE $name AS Utf8;
SELECT meta
FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash = $dir_hash AND name = $name;`
deleteFolderChildrenQuery = `
PRAGMA TablePathPrefix("%v");
DECLARE $dir_hash AS int64;
DECLARE $directory AS Utf8;
DELETE FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash = $dir_hash AND directory = $directory;`
listDirectoryQuery = `
PRAGMA TablePathPrefix("%v");
DECLARE $dir_hash AS int64;
DECLARE $directory AS Utf8;
DECLARE $start_name AS Utf8;
DECLARE $prefix AS Utf8;
DECLARE $limit AS Uint64;
SELECT name, meta
FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash = $dir_hash AND directory = $directory and name > $start_name and name LIKE $prefix
ORDER BY name ASC LIMIT $limit;`
listInclusiveDirectoryQuery = `
PRAGMA TablePathPrefix("%v");
DECLARE $dir_hash AS int64;
DECLARE $directory AS Utf8;
DECLARE $start_name AS Utf8;
DECLARE $prefix AS Utf8;
DECLARE $limit AS Uint64;
SELECT name, meta
FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash = $dir_hash AND directory = $directory and name >= $start_name and name LIKE $prefix
ORDER BY name ASC LIMIT $limit;`
)

403
weed/filer/ydb/ydb_store.go Normal file
View File

@@ -0,0 +1,403 @@
//go:build ydb
// +build ydb
package ydb
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"os"
"path"
"strings"
"sync"
"time"
)
const (
defaultDialTimeOut = 10
)
var (
roTX = table.TxControl(
table.BeginTx(table.WithOnlineReadOnly()),
table.CommitTx(),
)
rwTX = table.DefaultTxControl()
)
type YdbStore struct {
DB ydb.Connection
dirBuckets string
tablePathPrefix string
SupportBucketTable bool
dbs map[string]bool
dbsLock sync.Mutex
}
func init() {
filer.Stores = append(filer.Stores, &YdbStore{})
}
func (store *YdbStore) GetName() string {
return "ydb"
}
func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString("filer.options.buckets_folder"),
configuration.GetString(prefix+"dsn"),
configuration.GetString(prefix+"prefix"),
configuration.GetBool(prefix+"useBucketPrefix"),
configuration.GetInt(prefix+"dialTimeOut"),
configuration.GetInt(prefix+"poolSizeLimit"),
)
}
func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int) (err error) {
store.dirBuckets = dirBuckets
store.SupportBucketTable = useBucketPrefix
if store.SupportBucketTable {
glog.V(0).Infof("enabled BucketPrefix")
}
store.dbs = make(map[string]bool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if dialTimeOut == 0 {
dialTimeOut = defaultDialTimeOut
}
opts := []ydb.Option{
ydb.WithDialTimeout(time.Duration(dialTimeOut) * time.Second),
environ.WithEnvironCredentials(ctx),
}
if poolSizeLimit > 0 {
opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit))
}
if dsn == "" {
dsn = os.Getenv("YDB_CONNECTION_STRING")
}
store.DB, err = ydb.Open(ctx, dsn, opts...)
if err != nil || store.DB == nil {
if store.DB != nil {
_ = store.DB.Close(ctx)
store.DB = nil
}
return fmt.Errorf("can not connect to %s error: %v", dsn, err)
}
store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix)
if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err)
}
if err = store.createTable(ctx, store.tablePathPrefix); err != nil {
glog.Errorf("createTable %s: %v", store.tablePathPrefix, err)
}
return err
}
func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) {
var res result.Result
if tx, ok := ctx.Value("tx").(table.Transaction); ok {
res, err = tx.Execute(ctx, *query, params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("execute transaction: %v", err)
}
} else {
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
_, res, err = s.Execute(ctx, tc, *query,
params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("execute statement: %v", err)
}
return nil
})
}
if err != nil {
return err
}
if res != nil {
defer func() { _ = res.Close() }()
if processResultFunc != nil {
if err = processResultFunc(res); err != nil {
return fmt.Errorf("process result: %v", err)
}
}
}
return err
}
func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, isUpdate bool) (err error) {
dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta}
var query *string
if isUpdate {
query = withPragma(tablePathPrefix, updateQuery)
} else {
query = withPragma(tablePathPrefix, insertQuery)
}
return store.doTxOrDB(ctx, query, fileMeta.queryParameters(entry.TtlSec), rwTX, nil)
}
func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.insertOrUpdateEntry(ctx, entry, false)
}
func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.insertOrUpdateEntry(ctx, entry, true)
}
func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
var data []byte
entryFound := false
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
query := withPragma(tablePathPrefix, findQuery)
queryParams := table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
table.ValueParam("$name", types.UTF8Value(name)))
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error {
for res.NextResultSet(ctx) {
for res.NextRow() {
if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil {
return fmt.Errorf("scanNamed %s : %v", fullpath, err)
}
entryFound = true
return nil
}
}
return res.Err()
})
if err != nil {
return nil, err
}
if !entryFound {
return nil, filer_pb.ErrNotFound
}
entry = &filer.Entry{
FullPath: fullpath,
}
if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
return nil, fmt.Errorf("decode %s : %v", fullpath, err)
}
return entry, nil
}
func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
dir, name := fullpath.DirAndName()
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
query := withPragma(tablePathPrefix, deleteQuery)
queryParams := table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
table.ValueParam("$name", types.UTF8Value(name)))
return store.doTxOrDB(ctx, query, queryParams, rwTX, nil)
}
func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
dir, _ := fullpath.DirAndName()
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
query := withPragma(tablePathPrefix, deleteFolderChildrenQuery)
queryParams := table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
table.ValueParam("$directory", types.UTF8Value(*shortDir)))
return store.doTxOrDB(ctx, query, queryParams, rwTX, nil)
}
func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
}
func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dir := string(dirPath)
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
var query *string
if includeStartFile {
query = withPragma(tablePathPrefix, listInclusiveDirectoryQuery)
} else {
query = withPragma(tablePathPrefix, listDirectoryQuery)
}
queryParams := table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
table.ValueParam("$directory", types.UTF8Value(*shortDir)),
table.ValueParam("$start_name", types.UTF8Value(startFileName)),
table.ValueParam("$prefix", types.UTF8Value(prefix+"%")),
table.ValueParam("$limit", types.Uint64Value(uint64(limit))),
)
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error {
var name string
var data []byte
for res.NextResultSet(ctx) {
for res.NextRow() {
if err := res.ScanNamed(
named.OptionalWithDefault("name", &name),
named.OptionalWithDefault("meta", &data)); err != nil {
return fmt.Errorf("list scanNamed %s : %v", dir, err)
}
lastFileName = name
entry := &filer.Entry{
FullPath: util.NewFullPath(dir, name),
}
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
return fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
if !eachEntryFunc(entry) {
break
}
}
}
return res.Err()
})
if err != nil {
return lastFileName, err
}
return lastFileName, nil
}
func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
session, err := store.DB.Table().CreateSession(ctx)
if err != nil {
return ctx, err
}
tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite()))
if err != nil {
return ctx, err
}
return context.WithValue(ctx, "tx", tx), nil
}
func (store *YdbStore) CommitTransaction(ctx context.Context) error {
if tx, ok := ctx.Value("tx").(table.Transaction); ok {
_, err := tx.CommitTx(ctx)
return err
}
return nil
}
func (store *YdbStore) RollbackTransaction(ctx context.Context) error {
if tx, ok := ctx.Value("tx").(table.Transaction); ok {
return tx.Rollback(ctx)
}
return nil
}
func (store *YdbStore) Shutdown() {
_ = store.DB.Close(context.Background())
}
func (store *YdbStore) CanDropWholeBucket() bool {
return store.SupportBucketTable
}
func (store *YdbStore) OnBucketCreation(bucket string) {
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
if err := store.createTable(context.Background(),
path.Join(store.tablePathPrefix, bucket)); err != nil {
glog.Errorf("createTable %s: %v", bucket, err)
}
if store.dbs == nil {
return
}
store.dbs[bucket] = true
}
func (store *YdbStore) OnBucketDeletion(bucket string) {
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
if err := store.deleteTable(context.Background(),
path.Join(store.tablePathPrefix, bucket)); err != nil {
glog.Errorf("deleteTable %s: %v", bucket, err)
}
if store.dbs == nil {
return
}
delete(store.dbs, bucket)
}
func (store *YdbStore) createTable(ctx context.Context, prefix string) error {
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...)
})
}
func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error {
if !store.SupportBucketTable {
return nil
}
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE))
}); err != nil {
return err
}
glog.V(4).Infof("deleted table %s", prefix)
return nil
}
func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPrefix *string, shortDir *string) {
tablePathPrefix = &store.tablePathPrefix
shortDir = dir
if !store.SupportBucketTable {
return
}
prefixBuckets := store.dirBuckets + "/"
if strings.HasPrefix(*dir, prefixBuckets) {
// detect bucket
bucketAndDir := (*dir)[len(prefixBuckets):]
var bucket string
if t := strings.Index(bucketAndDir, "/"); t > 0 {
bucket = bucketAndDir[:t]
} else if t < 0 {
bucket = bucketAndDir
}
if bucket == "" {
return
}
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
tablePathPrefixWithBucket := path.Join(store.tablePathPrefix, bucket)
if _, found := store.dbs[bucket]; !found {
if err := store.createTable(ctx, tablePathPrefixWithBucket); err == nil {
store.dbs[bucket] = true
glog.V(4).Infof("created table %s", tablePathPrefixWithBucket)
} else {
glog.Errorf("createTable %s: %v", tablePathPrefixWithBucket, err)
}
}
tablePathPrefix = &tablePathPrefixWithBucket
}
return
}

View File

@@ -0,0 +1,79 @@
//go:build ydb
// +build ydb
package ydb
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
)
func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
fileMeta := FileMeta{dirHash, name, dirStr, value}
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
_, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery),
fileMeta.queryParameters(0),
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}
return nil
})
}
func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
valueFound := false
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
_, res, err := s.Execute(ctx, roTX, *withPragma(&store.tablePathPrefix, findQuery),
table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
table.ValueParam("$name", types.UTF8Value(name))),
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}
defer func() { _ = res.Close() }()
for res.NextResultSet(ctx) {
for res.NextRow() {
if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil {
return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err)
}
valueFound = true
return nil
}
}
return res.Err()
})
if !valueFound {
return nil, filer.ErrKvNotFound
}
return value, nil
}
func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) {
dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
_, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery),
table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
table.ValueParam("$name", types.UTF8Value(name))),
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}
return nil
})
}

View File

@@ -0,0 +1,60 @@
//go:build ydb
// +build ydb
package ydb
import (
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
)
//go:generate ydbgen
//ydb:gen
type FileMeta struct {
DirHash int64 `ydb:"type:int64"`
Name string `ydb:"type:utf8"`
Directory string `ydb:"type:utf8"`
Meta []byte `ydb:"type:string"`
}
//ydb:gen scan,value
type FileMetas []FileMeta
func (fm *FileMeta) queryParameters(ttlSec int32) *table.QueryParameters {
var expireAtValue types.Value
if ttlSec > 0 {
expireAtValue = types.Uint32Value(uint32(ttlSec))
} else {
expireAtValue = types.NullValue(types.TypeUint32)
}
return table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)),
table.ValueParam("$directory", types.UTF8Value(fm.Directory)),
table.ValueParam("$name", types.UTF8Value(fm.Name)),
table.ValueParam("$meta", types.StringValue(fm.Meta)),
table.ValueParam("$expire_at", expireAtValue))
}
func createTableOptions() []options.CreateTableOption {
columnUnit := options.TimeToLiveUnitSeconds
return []options.CreateTableOption{
options.WithColumn("dir_hash", types.Optional(types.TypeInt64)),
options.WithColumn("directory", types.Optional(types.TypeUTF8)),
options.WithColumn("name", types.Optional(types.TypeUTF8)),
options.WithColumn("meta", types.Optional(types.TypeString)),
options.WithColumn("expire_at", types.Optional(types.TypeUint32)),
options.WithPrimaryKeyColumn("dir_hash", "name"),
options.WithTimeToLiveSettings(options.TimeToLiveSettings{
ColumnName: "expire_at",
ColumnUnit: &columnUnit,
Mode: options.TimeToLiveModeValueSinceUnixEpoch},
),
}
}
func withPragma(prefix *string, query string) *string {
queryWithPragma := fmt.Sprintf(query, *prefix)
return &queryWithPragma
}

View File

@@ -38,6 +38,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/ydb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"